From 8e9b02f128d7b485bd357b71fdc58a0edf54fbb0 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 27 Dec 2023 07:56:20 -0500 Subject: [PATCH] feat: Introduce NodeBuilder (#5824) --- Cargo.lock | 1 + bin/reth/Cargo.toml | 1 + bin/reth/src/args/rpc_server_args.rs | 25 + bin/reth/src/cli/db_type.rs | 108 ++ bin/reth/src/cli/mod.rs | 2 + bin/reth/src/cli/node_builder.rs | 1354 ++++++++++++++++++++++++++ bin/reth/src/dirs.rs | 5 + bin/reth/src/node/mod.rs | 896 ++--------------- bin/reth/src/utils.rs | 29 +- crates/net/network/src/manager.rs | 31 +- crates/storage/db/src/lib.rs | 10 +- crates/tracing/src/lib.rs | 3 +- 12 files changed, 1618 insertions(+), 847 deletions(-) create mode 100644 bin/reth/src/cli/db_type.rs create mode 100644 bin/reth/src/cli/node_builder.rs diff --git a/Cargo.lock b/Cargo.lock index d1dd2d7c6..c9a327e6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5608,6 +5608,7 @@ dependencies = [ "metrics-exporter-prometheus", "metrics-process", "metrics-util", + "once_cell", "pin-project", "pretty_assertions", "procfs", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 966f0cd0b..d1747b069 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -74,6 +74,7 @@ metrics-util = "0.15.0" metrics-process = "1.0.9" reth-metrics.workspace = true metrics.workspace = true +once_cell.workspace = true # test vectors generation proptest.workspace = true diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 54f0f040f..4a6e47c41 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -179,6 +179,31 @@ pub struct RpcServerArgs { } impl RpcServerArgs { + /// Enables the HTTP-RPC server. + pub fn with_http(mut self) -> Self { + self.http = true; + self + } + + /// Enables the WS-RPC server. + pub fn with_ws(mut self) -> Self { + self.ws = true; + self + } + + /// Change rpc port numbers based on the instance number. + /// + /// Warning: if `instance` is zero, this will panic. + pub fn adjust_instance_ports(&mut self, instance: u16) { + debug_assert_ne!(instance, 0, "instance must be non-zero"); + // auth port is scaled by a factor of instance * 100 + self.auth_port += instance * 100 - 100; + // http port is scaled by a factor of -instance + self.http_port -= instance - 1; + // ws port is scaled by a factor of instance * 2 + self.ws_port += instance * 2 - 2; + } + /// Configures and launches _all_ servers. /// /// Returns the handles for the launched regular RPC server(s) (if any) and the server handle diff --git a/bin/reth/src/cli/db_type.rs b/bin/reth/src/cli/db_type.rs new file mode 100644 index 000000000..a852f88c2 --- /dev/null +++ b/bin/reth/src/cli/db_type.rs @@ -0,0 +1,108 @@ +//! A real or test database type + +use crate::dirs::{ChainPath, DataDirPath, MaybePlatformPath}; +use reth_db::{ + init_db, + test_utils::{create_test_rw_db, TempDatabase}, + DatabaseEnv, +}; +use reth_interfaces::db::LogLevel; +use reth_primitives::Chain; +use std::{str::FromStr, sync::Arc}; + +/// A type that represents either a _real_ (represented by a path), or _test_ database, which will +/// use a [TempDatabase]. +#[derive(Debug)] +pub enum DatabaseType { + /// The real database type + Real(MaybePlatformPath), + /// The test database type + Test, +} + +/// The [Default] implementation for [DatabaseType] uses the _real_ variant, using the default +/// value for the inner [MaybePlatformPath]. +impl Default for DatabaseType { + fn default() -> Self { + Self::Real(MaybePlatformPath::::default()) + } +} + +impl DatabaseType { + /// Creates a _test_ database + pub fn test() -> Self { + Self::Test + } +} + +/// Type that represents a [DatabaseType] and [LogLevel], used to build a database type +pub struct DatabaseBuilder { + /// The database type + db_type: DatabaseType, +} + +impl DatabaseBuilder { + /// Creates the [DatabaseBuilder] with the given [DatabaseType] + pub fn new(db_type: DatabaseType) -> Self { + Self { db_type } + } + + /// Initializes and returns the [DatabaseInstance] depending on the current database type. If + /// the [DatabaseType] is test, the [LogLevel] is not used. + /// + /// If the [DatabaseType] is test, then the [ChainPath] constructed will be derived from the db + /// path of the [TempDatabase] and the given chain. + pub fn build_db( + self, + log_level: Option, + chain: Chain, + ) -> eyre::Result { + match self.db_type { + DatabaseType::Test => { + let db = create_test_rw_db(); + let db_path_str = db.path().to_str().expect("Path is not valid unicode"); + let path = MaybePlatformPath::::from_str(db_path_str) + .expect("Path is not valid"); + let data_dir = path.unwrap_or_chain_default(chain); + + Ok(DatabaseInstance::Test { db, data_dir }) + } + DatabaseType::Real(path) => { + let data_dir = path.unwrap_or_chain_default(chain); + + tracing::info!(target: "reth::cli", path = ?data_dir, "Opening database"); + let db = Arc::new(init_db(data_dir.clone(), log_level)?); + Ok(DatabaseInstance::Real { db, data_dir }) + } + } + } +} + +/// A constructed database type, with a [ChainPath]. +#[derive(Debug, Clone)] +pub enum DatabaseInstance { + /// The test database + Test { + /// The database + db: Arc>, + /// The data dir + data_dir: ChainPath, + }, + /// The real database + Real { + /// The database + db: Arc, + /// The data dir + data_dir: ChainPath, + }, +} + +impl DatabaseInstance { + /// Returns the data dir for this database instance + pub fn data_dir(&self) -> &ChainPath { + match self { + Self::Test { data_dir, .. } => data_dir, + Self::Real { data_dir, .. } => data_dir, + } + } +} diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index 14f25a279..0cd519433 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -21,7 +21,9 @@ use std::{fmt, fmt::Display, sync::Arc}; pub mod components; pub mod config; +pub mod db_type; pub mod ext; +pub mod node_builder; /// Default [directives](Directive) for [EnvFilter] which disables high-frequency debug logs from /// `hyper` and `trust-dns` diff --git a/bin/reth/src/cli/node_builder.rs b/bin/reth/src/cli/node_builder.rs new file mode 100644 index 000000000..f72a863b1 --- /dev/null +++ b/bin/reth/src/cli/node_builder.rs @@ -0,0 +1,1354 @@ +//! Support for customizing the node +use super::{ + components::RethRpcServerHandles, db_type::DatabaseType, ext::DefaultRethNodeCommandConfig, +}; +use crate::{ + args::{ + get_secret_key, DatabaseArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs, + PruningArgs, RpcServerArgs, TxPoolArgs, + }, + cli::{ + components::RethNodeComponentsImpl, + config::{RethRpcConfig, RethTransactionPoolConfig}, + db_type::{DatabaseBuilder, DatabaseInstance}, + ext::{RethCliExt, RethNodeCommandConfig}, + }, + dirs::{ChainPath, DataDirPath, MaybePlatformPath}, + init::init_genesis, + node::{cl_events::ConsensusLayerHealthEvents, events}, + prometheus_exporter, + utils::{get_single_header, write_peers_to_file}, + version::SHORT_VERSION, +}; +use eyre::Context; +use fdlimit::raise_fd_limit; +use futures::{future::Either, stream, stream_select, StreamExt}; +use metrics_exporter_prometheus::PrometheusHandle; +use once_cell::sync::Lazy; +use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode}; +use reth_beacon_consensus::{ + hooks::{EngineHooks, PruneHook}, + BeaconConsensus, BeaconConsensusEngine, BeaconConsensusEngineError, + MIN_BLOCKS_FOR_PIPELINE_RUN, +}; +use reth_blockchain_tree::{ + config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, +}; +use reth_config::{ + config::{PruneConfig, StageConfig}, + Config, +}; +use reth_db::{ + database::Database, + database_metrics::{DatabaseMetadata, DatabaseMetrics}, +}; +use reth_downloaders::{ + bodies::bodies::BodiesDownloaderBuilder, + headers::reverse_headers::ReverseHeadersDownloaderBuilder, +}; +use reth_interfaces::{ + blockchain_tree::BlockchainTreeEngine, + consensus::Consensus, + p2p::{ + bodies::{client::BodiesClient, downloader::BodyDownloader}, + either::EitherDownloader, + headers::{client::HeadersClient, downloader::HeaderDownloader}, + }, + RethResult, +}; +use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager}; +use reth_network_api::{NetworkInfo, PeersInfo}; +use reth_primitives::{ + constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP}, + kzg::KzgSettings, + stage::StageId, + BlockHashOrNumber, BlockNumber, ChainSpec, DisplayHardforks, Head, SealedHeader, TxHash, B256, + MAINNET, +}; +use reth_provider::{ + providers::BlockchainProvider, BlockHashReader, BlockReader, + BlockchainTreePendingStateProvider, CanonStateSubscriptions, HeaderProvider, HeaderSyncMode, + ProviderFactory, StageCheckpointReader, +}; +use reth_prune::PrunerBuilder; +use reth_revm::EvmProcessorFactory; +use reth_revm_inspectors::stack::Hook; +use reth_rpc_engine_api::EngineApi; +use reth_stages::{ + prelude::*, + stages::{ + AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, + IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, + TotalDifficultyStage, TransactionLookupStage, + }, + MetricEvent, +}; +use reth_tasks::{TaskExecutor, TaskManager}; +use reth_transaction_pool::{ + blobstore::InMemoryBlobStore, EthTransactionPool, TransactionPool, + TransactionValidationTaskExecutor, +}; +use secp256k1::SecretKey; +use std::{ + net::{SocketAddr, SocketAddrV4}, + path::PathBuf, + sync::Arc, +}; +use tokio::{ + runtime::Handle, + sync::{ + mpsc::{unbounded_channel, Receiver, UnboundedSender}, + oneshot, watch, + }, +}; +use tracing::*; + +/// The default prometheus recorder handle. We use a global static to ensure that it is only +/// installed once. +pub static PROMETHEUS_RECORDER_HANDLE: Lazy = + Lazy::new(|| prometheus_exporter::install_recorder().unwrap()); + +/// Start the node +#[derive(Debug)] +pub struct NodeBuilder { + /// The test database + pub database: DatabaseType, + + /// The path to the configuration file to use. + pub config: Option, + + /// The chain this node is running. + /// + /// Possible values are either a built-in chain or the path to a chain specification file. + pub chain: Arc, + + /// Enable Prometheus metrics. + /// + /// The metrics will be served at the given interface and port. + pub metrics: Option, + + /// Add a new instance of a node. + /// + /// Configures the ports of the node to avoid conflicts with the defaults. + /// This is useful for running multiple nodes on the same machine. + /// + /// Max number of instances is 200. It is chosen in a way so that it's not possible to have + /// port numbers that conflict with each other. + /// + /// Changes to the following port numbers: + /// - DISCOVERY_PORT: default + `instance` - 1 + /// - AUTH_PORT: default + `instance` * 100 - 100 + /// - HTTP_RPC_PORT: default - `instance` + 1 + /// - WS_RPC_PORT: default + `instance` * 2 - 2 + pub instance: u16, + + /// Overrides the KZG trusted setup by reading from the supplied file. + pub trusted_setup_file: Option, + + /// All networking related arguments + pub network: NetworkArgs, + + /// All rpc related arguments + pub rpc: RpcServerArgs, + + /// All txpool related arguments with --txpool prefix + pub txpool: TxPoolArgs, + + /// All payload builder related arguments + pub builder: PayloadBuilderArgs, + + /// All debug related arguments with --debug prefix + pub debug: DebugArgs, + + /// All database related arguments + pub db: DatabaseArgs, + + /// All dev related arguments with --dev prefix + pub dev: DevArgs, + + /// All pruning related arguments + pub pruning: PruningArgs, + + /// Rollup related arguments + #[cfg(feature = "optimism")] + pub rollup: crate::args::RollupArgs, +} + +impl NodeBuilder { + /// Creates a testing [NodeBuilder], causing the database to be launched ephemerally. + pub fn test() -> Self { + Self { + database: DatabaseType::test(), + config: None, + chain: MAINNET.clone(), + metrics: None, + instance: 1, + trusted_setup_file: None, + network: NetworkArgs::default(), + rpc: RpcServerArgs::default(), + txpool: TxPoolArgs::default(), + builder: PayloadBuilderArgs::default(), + debug: DebugArgs::default(), + db: DatabaseArgs::default(), + dev: DevArgs::default(), + pruning: PruningArgs::default(), + #[cfg(feature = "optimism")] + rollup: crate::args::RollupArgs::default(), + } + } + + /// Set the datadir for the node + pub fn with_datadir(mut self, datadir: MaybePlatformPath) -> Self { + self.database = DatabaseType::Real(datadir); + self + } + + /// Set the config file for the node + pub fn with_config(mut self, config: impl Into) -> Self { + self.config = Some(config.into()); + self + } + + /// Set the chain for the node + pub fn with_chain(mut self, chain: Arc) -> Self { + self.chain = chain; + self + } + + /// Set the metrics address for the node + pub fn with_metrics(mut self, metrics: SocketAddr) -> Self { + self.metrics = Some(metrics); + self + } + + /// Set the instance for the node + pub fn with_instance(mut self, instance: u16) -> Self { + self.instance = instance; + self + } + + /// Set the [ChainSpec] for the node + pub fn with_chain_spec(mut self, chain: Arc) -> Self { + self.chain = chain; + self + } + + /// Set the trusted setup file for the node + pub fn with_trusted_setup_file(mut self, trusted_setup_file: impl Into) -> Self { + self.trusted_setup_file = Some(trusted_setup_file.into()); + self + } + + /// Set the network args for the node + pub fn with_network(mut self, network: NetworkArgs) -> Self { + self.network = network; + self + } + + /// Set the rpc args for the node + pub fn with_rpc(mut self, rpc: RpcServerArgs) -> Self { + self.rpc = rpc; + self + } + + /// Set the txpool args for the node + pub fn with_txpool(mut self, txpool: TxPoolArgs) -> Self { + self.txpool = txpool; + self + } + + /// Set the builder args for the node + pub fn with_builder(mut self, builder: PayloadBuilderArgs) -> Self { + self.builder = builder; + self + } + + /// Set the debug args for the node + pub fn with_debug(mut self, debug: DebugArgs) -> Self { + self.debug = debug; + self + } + + /// Set the database args for the node + pub fn with_db(mut self, db: DatabaseArgs) -> Self { + self.db = db; + self + } + + /// Set the dev args for the node + pub fn with_dev(mut self, dev: DevArgs) -> Self { + self.dev = dev; + self + } + + /// Set the pruning args for the node + pub fn with_pruning(mut self, pruning: PruningArgs) -> Self { + self.pruning = pruning; + self + } + + /// Set the node instance number + pub fn with_instance_number(mut self, instance: u16) -> Self { + self.instance = instance; + self + } + + /// Set the rollup args for the node + #[cfg(feature = "optimism")] + pub fn with_rollup(mut self, rollup: crate::args::RollupArgs) -> Self { + self.rollup = rollup; + self + } + + /// Launches the node, also adding any RPC extensions passed. + /// + /// # Example + /// ```rust + /// # use reth_tasks::{TaskManager, TaskSpawner}; + /// # use reth::cli::{ + /// # node_builder::NodeBuilder, + /// # ext::DefaultRethNodeCommandConfig, + /// # }; + /// # use tokio::runtime::Handle; + /// + /// async fn t() { + /// let handle = Handle::current(); + /// let manager = TaskManager::new(handle); + /// let executor = manager.executor(); + /// let builder = NodeBuilder::default(); + /// let ext = DefaultRethNodeCommandConfig::default(); + /// let handle = builder.launch::<()>(ext, executor).await.unwrap(); + /// } + /// ``` + pub async fn launch( + mut self, + ext: E::Node, + executor: TaskExecutor, + ) -> eyre::Result { + let database = std::mem::take(&mut self.database); + let db_instance = + DatabaseBuilder::new(database).build_db(self.db.log_level, self.chain.chain)?; + + match db_instance { + DatabaseInstance::Real { db, data_dir } => { + let builder = NodeBuilderWithDatabase { config: self, db, data_dir }; + builder.launch::(ext, executor).await + } + DatabaseInstance::Test { db, data_dir } => { + let builder = NodeBuilderWithDatabase { config: self, db, data_dir }; + builder.launch::(ext, executor).await + } + } + } + + /// Get the network secret from the given data dir + pub fn network_secret(&self, data_dir: &ChainPath) -> eyre::Result { + let network_secret_path = + self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path()); + debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file"); + let secret_key = get_secret_key(&network_secret_path)?; + Ok(secret_key) + } + + /// Returns the initial pipeline target, based on whether or not the node is running in + /// `debug.tip` mode, `debug.continuous` mode, or neither. + /// + /// If running in `debug.tip` mode, the configured tip is returned. + /// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned. + /// Otherwise, `None` is returned. This is what the node will do by default. + pub fn initial_pipeline_target(&self, genesis_hash: B256) -> Option { + if let Some(tip) = self.debug.tip { + // Set the provided tip as the initial pipeline target. + debug!(target: "reth::cli", %tip, "Tip manually set"); + Some(tip) + } else if self.debug.continuous { + // Set genesis as the initial pipeline target. + // This will allow the downloader to start + debug!(target: "reth::cli", "Continuous sync mode enabled"); + Some(genesis_hash) + } else { + None + } + } + + /// Returns the max block that the node should run to, looking it up from the network if + /// necessary + pub async fn max_block( + &self, + network_client: &Client, + provider_factory: ProviderFactory, + ) -> eyre::Result> + where + DB: Database, + Client: HeadersClient, + { + let max_block = if let Some(block) = self.debug.max_block { + Some(block) + } else if let Some(tip) = self.debug.tip { + Some(self.lookup_or_fetch_tip(provider_factory, network_client, tip).await?) + } else { + None + }; + + Ok(max_block) + } + + /// Get the [MiningMode] from the given dev args + pub fn mining_mode(&self, pending_transactions_listener: Receiver) -> MiningMode { + if let Some(interval) = self.dev.block_time { + MiningMode::interval(interval) + } else if let Some(max_transactions) = self.dev.block_max_transactions { + MiningMode::instant(max_transactions, pending_transactions_listener) + } else { + info!(target: "reth::cli", "No mining mode specified, defaulting to ReadyTransaction"); + MiningMode::instant(1, pending_transactions_listener) + } + } + + /// Build a network and spawn it + pub async fn build_network( + &self, + config: &Config, + provider_factory: ProviderFactory, + executor: TaskExecutor, + head: Head, + data_dir: &ChainPath, + ) -> eyre::Result<(ProviderFactory, NetworkBuilder, (), ()>)> + where + DB: Database + Unpin + Clone + 'static, + { + info!(target: "reth::cli", "Connecting to P2P network"); + let network_secret_path = + self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path()); + debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file"); + let secret_key = get_secret_key(&network_secret_path)?; + let default_peers_path = data_dir.known_peers_path(); + let network_config = self.load_network_config( + config, + provider_factory, + executor.clone(), + head, + secret_key, + default_peers_path.clone(), + ); + + let client = network_config.client.clone(); + let builder = NetworkManager::builder(network_config).await?; + Ok((client, builder)) + } + + /// Build the blockchain tree + pub fn build_blockchain_tree( + &self, + provider_factory: ProviderFactory, + consensus: Arc, + prune_config: Option, + sync_metrics_tx: UnboundedSender, + tree_config: BlockchainTreeConfig, + ) -> eyre::Result> + where + DB: Database + Unpin + Clone + 'static, + { + // configure blockchain tree + let tree_externals = TreeExternals::new( + provider_factory.clone(), + consensus.clone(), + EvmProcessorFactory::new(self.chain.clone()), + ); + let tree = BlockchainTree::new( + tree_externals, + tree_config, + prune_config.clone().map(|config| config.segments), + )? + .with_sync_metrics_tx(sync_metrics_tx.clone()); + + Ok(tree) + } + + /// Build a transaction pool and spawn the transaction pool maintenance task + pub fn build_and_spawn_txpool( + &self, + blockchain_db: &BlockchainProvider, + head: Head, + executor: &TaskExecutor, + ) -> eyre::Result, InMemoryBlobStore>> + where + DB: Database + Unpin + Clone + 'static, + Tree: BlockchainTreeEngine + + BlockchainTreePendingStateProvider + + CanonStateSubscriptions + + Clone + + 'static, + { + let blob_store = InMemoryBlobStore::default(); + let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain)) + .with_head_timestamp(head.timestamp) + .kzg_settings(self.kzg_settings()?) + .with_additional_tasks(1) + .build_with_tasks(blockchain_db.clone(), executor.clone(), blob_store.clone()); + + let transaction_pool = + reth_transaction_pool::Pool::eth_pool(validator, blob_store, self.txpool.pool_config()); + info!(target: "reth::cli", "Transaction pool initialized"); + + // spawn txpool maintenance task + { + let pool = transaction_pool.clone(); + let chain_events = blockchain_db.canonical_state_stream(); + let client = blockchain_db.clone(); + executor.spawn_critical( + "txpool maintenance task", + reth_transaction_pool::maintain::maintain_transaction_pool_future( + client, + pool, + chain_events, + executor.clone(), + Default::default(), + ), + ); + debug!(target: "reth::cli", "Spawned txpool maintenance task"); + } + + Ok(transaction_pool) + } + + /// Returns the [Consensus] instance to use. + /// + /// By default this will be a [BeaconConsensus] instance, but if the `--dev` flag is set, it + /// will be an [AutoSealConsensus] instance. + pub fn consensus(&self) -> Arc { + if self.dev.dev { + Arc::new(AutoSealConsensus::new(Arc::clone(&self.chain))) + } else { + Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))) + } + } + + /// Constructs a [Pipeline] that's wired to the network + #[allow(clippy::too_many_arguments)] + async fn build_networked_pipeline( + &self, + config: &StageConfig, + client: Client, + consensus: Arc, + provider_factory: ProviderFactory, + task_executor: &TaskExecutor, + metrics_tx: reth_stages::MetricEventsSender, + prune_config: Option, + max_block: Option, + ) -> eyre::Result> + where + DB: Database + Unpin + Clone + 'static, + Client: HeadersClient + BodiesClient + Clone + 'static, + { + // building network downloaders using the fetch client + let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers) + .build(client.clone(), Arc::clone(&consensus)) + .into_task_with(task_executor); + + let body_downloader = BodiesDownloaderBuilder::new(config.bodies) + .build(client, Arc::clone(&consensus), provider_factory.clone()) + .into_task_with(task_executor); + + let pipeline = self + .build_pipeline( + provider_factory, + config, + header_downloader, + body_downloader, + consensus, + max_block, + self.debug.continuous, + metrics_tx, + prune_config, + ) + .await?; + + Ok(pipeline) + } + + /// Returns the chain specific path to the data dir. This returns `None` if the database is + /// configured for testing. + fn data_dir(&self) -> Option> { + match &self.database { + DatabaseType::Real(data_dir) => { + Some(data_dir.unwrap_or_chain_default(self.chain.chain)) + } + DatabaseType::Test => None, + } + } + + /// Returns the path to the config file. + fn config_path(&self) -> Option { + let chain_dir = self.data_dir()?; + + let config = self.config.clone().unwrap_or_else(|| chain_dir.config_path()); + Some(config) + } + + /// Loads the reth config with the given datadir root + fn load_config(&self) -> eyre::Result { + let Some(config_path) = self.config_path() else { todo!() }; + + let mut config = confy::load_path::(&config_path) + .wrap_err_with(|| format!("Could not load config file {:?}", config_path))?; + + info!(target: "reth::cli", path = ?config_path, "Configuration loaded"); + + // Update the config with the command line arguments + config.peers.connect_trusted_nodes_only = self.network.trusted_only; + + if !self.network.trusted_peers.is_empty() { + info!(target: "reth::cli", "Adding trusted nodes"); + self.network.trusted_peers.iter().for_each(|peer| { + config.peers.trusted_nodes.insert(*peer); + }); + } + + Ok(config) + } + + /// Loads the trusted setup params from a given file path or falls back to + /// `MAINNET_KZG_TRUSTED_SETUP`. + fn kzg_settings(&self) -> eyre::Result> { + if let Some(ref trusted_setup_file) = self.trusted_setup_file { + let trusted_setup = KzgSettings::load_trusted_setup_file(trusted_setup_file) + .map_err(LoadKzgSettingsError::KzgError)?; + Ok(Arc::new(trusted_setup)) + } else { + Ok(Arc::clone(&MAINNET_KZG_TRUSTED_SETUP)) + } + } + + fn install_prometheus_recorder(&self) -> eyre::Result { + Ok(PROMETHEUS_RECORDER_HANDLE.clone()) + } + + async fn start_metrics_endpoint( + &self, + prometheus_handle: PrometheusHandle, + db: Metrics, + ) -> eyre::Result<()> + where + Metrics: DatabaseMetrics + 'static + Send + Sync, + { + if let Some(listen_addr) = self.metrics { + info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); + prometheus_exporter::serve( + listen_addr, + prometheus_handle, + db, + metrics_process::Collector::default(), + ) + .await?; + } + + Ok(()) + } + + /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected + /// to that network. + fn start_network( + &self, + builder: NetworkBuilder, + task_executor: &TaskExecutor, + pool: Pool, + client: C, + data_dir: &ChainPath, + ) -> NetworkHandle + where + C: BlockReader + HeaderProvider + Clone + Unpin + 'static, + Pool: TransactionPool + Unpin + 'static, + { + let (handle, network, txpool, eth) = + builder.transactions(pool).request_handler(client).split_with_handle(); + + task_executor.spawn_critical("p2p txpool", txpool); + task_executor.spawn_critical("p2p eth request handler", eth); + + let default_peers_path = data_dir.known_peers_path(); + let known_peers_file = self.network.persistent_peers_file(default_peers_path); + task_executor.spawn_critical_with_graceful_shutdown_signal( + "p2p network task", + |shutdown| { + network.run_until_graceful_shutdown(shutdown, |network| { + write_peers_to_file(network, known_peers_file) + }) + }, + ); + + handle + } + + /// Fetches the head block from the database. + /// + /// If the database is empty, returns the genesis block. + fn lookup_head(&self, factory: ProviderFactory) -> RethResult { + let provider = factory.provider()?; + + let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number; + + let header = provider + .header_by_number(head)? + .expect("the header for the latest block is missing, database is corrupt"); + + let total_difficulty = provider + .header_td_by_number(head)? + .expect("the total difficulty for the latest block is missing, database is corrupt"); + + let hash = provider + .block_hash(head)? + .expect("the hash for the latest block is missing, database is corrupt"); + + Ok(Head { + number: head, + hash, + difficulty: header.difficulty, + total_difficulty, + timestamp: header.timestamp, + }) + } + + /// Attempt to look up the block number for the tip hash in the database. + /// If it doesn't exist, download the header and return the block number. + /// + /// NOTE: The download is attempted with infinite retries. + async fn lookup_or_fetch_tip( + &self, + provider_factory: ProviderFactory, + client: Client, + tip: B256, + ) -> RethResult + where + DB: Database, + Client: HeadersClient, + { + Ok(self.fetch_tip(provider_factory, client, BlockHashOrNumber::Hash(tip)).await?.number) + } + + /// Attempt to look up the block with the given number and return the header. + /// + /// NOTE: The download is attempted with infinite retries. + async fn fetch_tip( + &self, + factory: ProviderFactory, + client: Client, + tip: BlockHashOrNumber, + ) -> RethResult + where + DB: Database, + Client: HeadersClient, + { + let provider = factory.provider()?; + + let header = provider.header_by_hash_or_number(tip)?; + + // try to look up the header in the database + if let Some(header) = header { + info!(target: "reth::cli", ?tip, "Successfully looked up tip block in the database"); + return Ok(header.seal_slow()) + } + + info!(target: "reth::cli", ?tip, "Fetching tip block from the network."); + loop { + match get_single_header(&client, tip).await { + Ok(tip_header) => { + info!(target: "reth::cli", ?tip, "Successfully fetched tip"); + return Ok(tip_header) + } + Err(error) => { + error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying..."); + } + } + } + } + + fn load_network_config( + &self, + config: &Config, + provider_factory: ProviderFactory, + executor: TaskExecutor, + head: Head, + secret_key: SecretKey, + default_peers_path: PathBuf, + ) -> NetworkConfig> { + let cfg_builder = self + .network + .network_config(config, self.chain.clone(), secret_key, default_peers_path) + .with_task_executor(Box::new(executor)) + .set_head(head) + .listener_addr(SocketAddr::V4(SocketAddrV4::new( + self.network.addr, + // set discovery port based on instance number + self.network.port + self.instance - 1, + ))) + .discovery_addr(SocketAddr::V4(SocketAddrV4::new( + self.network.addr, + // set discovery port based on instance number + self.network.port + self.instance - 1, + ))); + + // When `sequencer_endpoint` is configured, the node will forward all transactions to a + // Sequencer node for execution and inclusion on L1, and disable its own txpool + // gossip to prevent other parties in the network from learning about them. + #[cfg(feature = "optimism")] + let cfg_builder = cfg_builder + .sequencer_endpoint(self.rollup.sequencer_http.clone()) + .disable_tx_gossip(self.rollup.disable_txpool_gossip); + + cfg_builder.build(provider_factory) + } + + #[allow(clippy::too_many_arguments)] + async fn build_pipeline( + &self, + provider_factory: ProviderFactory, + stage_config: &StageConfig, + header_downloader: H, + body_downloader: B, + consensus: Arc, + max_block: Option, + continuous: bool, + metrics_tx: reth_stages::MetricEventsSender, + prune_config: Option, + ) -> eyre::Result> + where + DB: Database + Clone + 'static, + H: HeaderDownloader + 'static, + B: BodyDownloader + 'static, + { + let mut builder = Pipeline::builder(); + + if let Some(max_block) = max_block { + debug!(target: "reth::cli", max_block, "Configuring builder to use max block"); + builder = builder.with_max_block(max_block) + } + + let (tip_tx, tip_rx) = watch::channel(B256::ZERO); + use reth_revm_inspectors::stack::InspectorStackConfig; + let factory = reth_revm::EvmProcessorFactory::new(self.chain.clone()); + + let stack_config = InspectorStackConfig { + use_printer_tracer: self.debug.print_inspector, + hook: if let Some(hook_block) = self.debug.hook_block { + Hook::Block(hook_block) + } else if let Some(tx) = self.debug.hook_transaction { + Hook::Transaction(tx) + } else if self.debug.hook_all { + Hook::All + } else { + Hook::None + }, + }; + + let factory = factory.with_stack_config(stack_config); + + let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default(); + + let header_mode = + if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; + let pipeline = builder + .with_tip_sender(tip_tx) + .with_metrics_tx(metrics_tx.clone()) + .add_stages( + DefaultStages::new( + provider_factory.clone(), + header_mode, + Arc::clone(&consensus), + header_downloader, + body_downloader, + factory.clone(), + ) + .set( + TotalDifficultyStage::new(consensus) + .with_commit_threshold(stage_config.total_difficulty.commit_threshold), + ) + .set(SenderRecoveryStage { + commit_threshold: stage_config.sender_recovery.commit_threshold, + }) + .set( + ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: stage_config.execution.max_blocks, + max_changes: stage_config.execution.max_changes, + max_cumulative_gas: stage_config.execution.max_cumulative_gas, + }, + stage_config + .merkle + .clean_threshold + .max(stage_config.account_hashing.clean_threshold) + .max(stage_config.storage_hashing.clean_threshold), + prune_modes.clone(), + ) + .with_metrics_tx(metrics_tx), + ) + .set(AccountHashingStage::new( + stage_config.account_hashing.clean_threshold, + stage_config.account_hashing.commit_threshold, + )) + .set(StorageHashingStage::new( + stage_config.storage_hashing.clean_threshold, + stage_config.storage_hashing.commit_threshold, + )) + .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) + .set(TransactionLookupStage::new( + stage_config.transaction_lookup.commit_threshold, + prune_modes.transaction_lookup, + )) + .set(IndexAccountHistoryStage::new( + stage_config.index_account_history.commit_threshold, + prune_modes.account_history, + )) + .set(IndexStorageHistoryStage::new( + stage_config.index_storage_history.commit_threshold, + prune_modes.storage_history, + )), + ) + .build(provider_factory); + + Ok(pipeline) + } + + /// Change rpc port numbers based on the instance number, using the inner + /// [RpcServerArgs::adjust_instance_ports] method. + fn adjust_instance_ports(&mut self) { + self.rpc.adjust_instance_ports(self.instance); + } +} + +impl Default for NodeBuilder { + fn default() -> Self { + Self { + database: DatabaseType::default(), + config: None, + chain: MAINNET.clone(), + metrics: None, + instance: 1, + trusted_setup_file: None, + network: NetworkArgs::default(), + rpc: RpcServerArgs::default(), + txpool: TxPoolArgs::default(), + builder: PayloadBuilderArgs::default(), + debug: DebugArgs::default(), + db: DatabaseArgs::default(), + dev: DevArgs::default(), + pruning: PruningArgs::default(), + #[cfg(feature = "optimism")] + rollup: crate::args::RollupArgs::default(), + } + } +} + +/// A version of the [NodeBuilder] that has an installed database. This is used to construct the +/// [NodeHandle]. +/// +/// This also contains a path to a data dir that cannot be changed. +pub struct NodeBuilderWithDatabase { + /// The node config + pub config: NodeBuilder, + /// The database + pub db: Arc, + /// The data dir + pub data_dir: ChainPath, +} + +impl NodeBuilderWithDatabase { + /// Launch the node with the given extensions and executor + pub async fn launch( + mut self, + mut ext: E::Node, + executor: TaskExecutor, + ) -> eyre::Result { + info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); + + // Raise the fd limit of the process. + // Does not do anything on windows. + raise_fd_limit()?; + + // get config + let config = self.config.load_config()?; + + let prometheus_handle = self.config.install_prometheus_recorder()?; + info!(target: "reth::cli", "Database opened"); + + let mut provider_factory = + ProviderFactory::new(Arc::clone(&self.db), Arc::clone(&self.config.chain)); + + // configure snapshotter + let snapshotter = reth_snapshot::Snapshotter::new( + provider_factory.clone(), + self.data_dir.snapshots_path(), + self.config.chain.snapshot_block_interval, + )?; + + provider_factory = provider_factory.with_snapshots( + self.data_dir.snapshots_path(), + snapshotter.highest_snapshot_receiver(), + )?; + + self.config.start_metrics_endpoint(prometheus_handle, Arc::clone(&self.db)).await?; + + debug!(target: "reth::cli", chain=%self.config.chain.chain, genesis=?self.config.chain.genesis_hash(), "Initializing genesis"); + + let genesis_hash = init_genesis(Arc::clone(&self.db), self.config.chain.clone())?; + + info!(target: "reth::cli", "{}", DisplayHardforks::new(self.config.chain.hardforks())); + + let consensus = self.config.consensus(); + + debug!(target: "reth::cli", "Spawning stages metrics listener task"); + let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel(); + let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx); + executor.spawn_critical("stages metrics listener task", sync_metrics_listener); + + let prune_config = self + .config + .pruning + .prune_config(Arc::clone(&self.config.chain))? + .or(config.prune.clone()); + + // configure blockchain tree + let tree_config = BlockchainTreeConfig::default(); + let tree = self.config.build_blockchain_tree( + provider_factory.clone(), + consensus.clone(), + prune_config.clone(), + sync_metrics_tx.clone(), + tree_config, + )?; + let canon_state_notification_sender = tree.canon_state_notification_sender(); + let blockchain_tree = ShareableBlockchainTree::new(tree); + debug!(target: "reth::cli", "configured blockchain tree"); + + // fetch the head block from the database + let head = self + .config + .lookup_head(provider_factory.clone()) + .wrap_err("the head block is missing")?; + + // setup the blockchain provider + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?; + + // build transaction pool + let transaction_pool = + self.config.build_and_spawn_txpool(&blockchain_db, head, &executor)?; + + // build network + info!(target: "reth::cli", "Connecting to P2P network"); + let (network_client, mut network_builder) = self + .config + .build_network( + &config, + provider_factory.clone(), + executor.clone(), + head, + &self.data_dir, + ) + .await?; + + let components = RethNodeComponentsImpl { + provider: blockchain_db.clone(), + pool: transaction_pool.clone(), + network: network_builder.handle(), + task_executor: executor.clone(), + events: blockchain_db.clone(), + }; + + // allow network modifications + ext.configure_network(network_builder.network_mut(), &components)?; + + // launch network + let network = self.config.start_network( + network_builder, + &executor, + transaction_pool.clone(), + network_client, + &self.data_dir, + ); + + info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), enode = %network.local_node_record(), "Connected to P2P network"); + debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); + let network_client = network.fetch_client().await?; + + ext.on_components_initialized(&components)?; + + debug!(target: "reth::cli", "Spawning payload builder service"); + let payload_builder = + ext.spawn_payload_builder_service(&self.config.builder, &components)?; + + let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + let max_block = self.config.max_block(&network_client, provider_factory.clone()).await?; + + // Configure the pipeline + let (mut pipeline, client) = if self.config.dev.dev { + info!(target: "reth::cli", "Starting Reth in dev mode"); + let mining_mode = + self.config.mining_mode(transaction_pool.pending_transactions_listener()); + + let (_, client, mut task) = AutoSealBuilder::new( + Arc::clone(&self.config.chain), + blockchain_db.clone(), + transaction_pool.clone(), + consensus_engine_tx.clone(), + canon_state_notification_sender, + mining_mode, + ) + .build(); + + let mut pipeline = self + .config + .build_networked_pipeline( + &config.stages, + client.clone(), + Arc::clone(&consensus), + provider_factory.clone(), + &executor, + sync_metrics_tx, + prune_config.clone(), + max_block, + ) + .await?; + + let pipeline_events = pipeline.events(); + task.set_pipeline_events(pipeline_events); + debug!(target: "reth::cli", "Spawning auto mine task"); + executor.spawn(Box::pin(task)); + + (pipeline, EitherDownloader::Left(client)) + } else { + let pipeline = self + .config + .build_networked_pipeline( + &config.stages, + network_client.clone(), + Arc::clone(&consensus), + provider_factory.clone(), + &executor.clone(), + sync_metrics_tx, + prune_config.clone(), + max_block, + ) + .await?; + + (pipeline, EitherDownloader::Right(network_client)) + }; + + let pipeline_events = pipeline.events(); + + let initial_target = self.config.initial_pipeline_target(genesis_hash); + let mut hooks = EngineHooks::new(); + + let pruner_events = if let Some(prune_config) = prune_config { + let mut pruner = PrunerBuilder::new(prune_config.clone()) + .max_reorg_depth(tree_config.max_reorg_depth() as usize) + .prune_delete_limit(self.config.chain.prune_delete_limit) + .build(provider_factory, snapshotter.highest_snapshot_receiver()); + + let events = pruner.events(); + hooks.add(PruneHook::new(pruner, Box::new(executor.clone()))); + + info!(target: "reth::cli", ?prune_config, "Pruner initialized"); + Either::Left(events) + } else { + Either::Right(stream::empty()) + }; + + // Configure the consensus engine + let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( + client, + pipeline, + blockchain_db.clone(), + Box::new(executor.clone()), + Box::new(network.clone()), + max_block, + self.config.debug.continuous, + payload_builder.clone(), + initial_target, + MIN_BLOCKS_FOR_PIPELINE_RUN, + consensus_engine_tx, + consensus_engine_rx, + hooks, + )?; + info!(target: "reth::cli", "Consensus engine initialized"); + + let events = stream_select!( + network.event_listener().map(Into::into), + beacon_engine_handle.event_listener().map(Into::into), + pipeline_events.map(Into::into), + if self.config.debug.tip.is_none() { + Either::Left( + ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone())) + .map(Into::into), + ) + } else { + Either::Right(stream::empty()) + }, + pruner_events.map(Into::into) + ); + executor.spawn_critical( + "events task", + events::handle_events( + Some(network.clone()), + Some(head.number), + events, + self.db.clone(), + ), + ); + + let engine_api = EngineApi::new( + blockchain_db.clone(), + self.config.chain.clone(), + beacon_engine_handle, + payload_builder.into(), + Box::new(executor.clone()), + ); + info!(target: "reth::cli", "Engine API handler initialized"); + + // extract the jwt secret from the args if possible + let default_jwt_path = self.data_dir.jwt_path(); + let jwt_secret = self.config.rpc.auth_jwt_secret(default_jwt_path)?; + + // adjust rpc port numbers based on instance number + self.config.adjust_instance_ports(); + + // Start RPC servers + let rpc_server_handles = + self.config.rpc.start_servers(&components, engine_api, jwt_secret, &mut ext).await?; + + // Run consensus engine to completion + let (tx, rx) = oneshot::channel(); + info!(target: "reth::cli", "Starting consensus engine"); + executor.spawn_critical_blocking("consensus engine", async move { + let res = beacon_consensus_engine.await; + let _ = tx.send(res); + }); + + ext.on_node_started(&components)?; + + // If `enable_genesis_walkback` is set to true, the rollup client will need to + // perform the derivation pipeline from genesis, validating the data dir. + // When set to false, set the finalized, safe, and unsafe head block hashes + // on the rollup client using a fork choice update. This prevents the rollup + // client from performing the derivation pipeline from genesis, and instead + // starts syncing from the current tip in the DB. + #[cfg(feature = "optimism")] + if self.config.chain.is_optimism() && !self.config.rollup.enable_genesis_walkback { + let client = rpc_server_handles.auth.http_client(); + reth_rpc_api::EngineApiClient::fork_choice_updated_v2( + &client, + reth_rpc_types::engine::ForkchoiceState { + head_block_hash: head.hash, + safe_block_hash: head.hash, + finalized_block_hash: head.hash, + }, + None, + ) + .await?; + } + + // construct node handle and return + let node_handle = NodeHandle { + rpc_server_handles, + consensus_engine_rx: rx, + terminate: self.config.debug.terminate, + }; + Ok(node_handle) + } +} + +/// The [NodeHandle] contains the [RethRpcServerHandles] returned by the reth initialization +/// process, as well as a method for waiting for the node exit. +#[derive(Debug)] +pub struct NodeHandle { + /// The handles to the RPC servers + rpc_server_handles: RethRpcServerHandles, + + /// The receiver half of the channel for the consensus engine. + /// This can be used to wait for the consensus engine to exit. + consensus_engine_rx: oneshot::Receiver>, + + /// Flag indicating whether the node should be terminated after the pipeline sync. + terminate: bool, +} + +impl NodeHandle { + /// Returns the [RethRpcServerHandles] for this node. + pub fn rpc_server_handles(&self) -> &RethRpcServerHandles { + &self.rpc_server_handles + } + + /// Waits for the node to exit, if it was configured to exit. + pub async fn wait_for_node_exit(self) -> eyre::Result<()> { + self.consensus_engine_rx.await??; + info!(target: "reth::cli", "Consensus engine has exited."); + + if self.terminate { + Ok(()) + } else { + // The pipeline has finished downloading blocks up to `--debug.tip` or + // `--debug.max-block`. Keep other node components alive for further usage. + futures::future::pending().await + } + } +} + +/// A simple function to launch a node with the specified [NodeBuilder], spawning tasks on the +/// [TaskExecutor] constructed from [Handle::current]. +pub async fn spawn_node(config: NodeBuilder) -> eyre::Result { + let handle = Handle::current(); + let task_manager = TaskManager::new(handle); + let ext = DefaultRethNodeCommandConfig; + config.launch::<()>(ext, task_manager.executor()).await +} + +#[cfg(test)] +mod tests { + use super::*; + use reth_primitives::U256; + use reth_rpc_api::EthApiClient; + + #[tokio::test] + async fn block_number_node_config_test() { + // this launches a test node with http + let rpc_args = RpcServerArgs::default().with_http(); + + // NOTE: tests here manually set an instance number. The alternative would be to use an + // atomic counter. This works for `cargo test` but if tests would be run in `nextest` then + // they would become flaky. So new tests should manually set a unique instance number. + let handle = + spawn_node(NodeBuilder::test().with_rpc(rpc_args).with_instance(1)).await.unwrap(); + + // call a function on the node + let client = handle.rpc_server_handles().rpc.http_client().unwrap(); + let block_number = client.block_number().await.unwrap(); + + // it should be zero, since this is an ephemeral test node + assert_eq!(block_number, U256::ZERO); + } + + #[tokio::test] + async fn rpc_handles_none_without_http() { + // this launches a test node _without_ http + let handle = spawn_node(NodeBuilder::test().with_instance(2)).await.unwrap(); + + // ensure that the `http_client` is none + let maybe_client = handle.rpc_server_handles().rpc.http_client(); + assert!(maybe_client.is_none()); + } + + #[tokio::test] + async fn launch_multiple_nodes() { + // spawn_test_node takes roughly 1 second per node, so this test takes ~4 seconds + let num_nodes = 4; + + let starting_instance = 3; + let mut handles = Vec::new(); + for i in 0..num_nodes { + let handle = + spawn_node(NodeBuilder::test().with_instance(starting_instance + i)).await.unwrap(); + handles.push(handle); + } + } +} diff --git a/bin/reth/src/dirs.rs b/bin/reth/src/dirs.rs index 064b18e7f..7bcbd2ce3 100644 --- a/bin/reth/src/dirs.rs +++ b/bin/reth/src/dirs.rs @@ -190,6 +190,11 @@ impl MaybePlatformPath { ) } + /// Returns the default platform path for the specified [Chain]. + pub fn chain_default(chain: Chain) -> ChainPath { + PlatformPath::default().with_chain(chain) + } + /// Returns true if a custom path is set pub fn is_some(&self) -> bool { self.0.is_some() diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 7cecca9f2..664da40e1 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -3,90 +3,21 @@ //! Starts the client use crate::{ args::{ - get_secret_key, utils::{chain_help, genesis_value_parser, parse_socket_address, SUPPORTED_CHAINS}, DatabaseArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, TxPoolArgs, }, - cli::{ - components::RethNodeComponentsImpl, - config::{RethRpcConfig, RethTransactionPoolConfig}, - ext::{RethCliExt, RethNodeCommandConfig}, - }, - dirs::{ChainPath, DataDirPath, MaybePlatformPath}, - init::init_genesis, - node::cl_events::ConsensusLayerHealthEvents, - prometheus_exporter, + cli::{db_type::DatabaseType, ext::RethCliExt, node_builder::NodeBuilder}, + dirs::{DataDirPath, MaybePlatformPath}, runner::CliContext, - utils::get_single_header, version::SHORT_VERSION, }; use clap::{value_parser, Parser}; -use eyre::Context; -use futures::{future::Either, pin_mut, stream, stream_select, StreamExt}; -use metrics_exporter_prometheus::PrometheusHandle; -use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode}; -use reth_beacon_consensus::{ - hooks::{EngineHooks, PruneHook}, - BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN, -}; -use reth_blockchain_tree::{ - config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, -}; -use reth_config::{ - config::{PruneConfig, StageConfig}, - Config, -}; -use reth_db::{database::Database, database_metrics::DatabaseMetrics, init_db}; -use reth_downloaders::{ - bodies::bodies::BodiesDownloaderBuilder, - headers::reverse_headers::ReverseHeadersDownloaderBuilder, -}; -use reth_interfaces::{ - consensus::Consensus, - p2p::{ - bodies::{client::BodiesClient, downloader::BodyDownloader}, - either::EitherDownloader, - headers::{client::HeadersClient, downloader::HeaderDownloader}, - }, - RethResult, -}; -use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager}; -use reth_network_api::{NetworkInfo, PeersInfo}; -use reth_primitives::{ - constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP}, - fs, - kzg::KzgSettings, - stage::StageId, - BlockHashOrNumber, BlockNumber, ChainSpec, DisplayHardforks, Head, SealedHeader, B256, -}; -use reth_provider::{ - providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions, - HeaderProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader, -}; -use reth_prune::PrunerBuilder; -use reth_revm::EvmProcessorFactory; -use reth_revm_inspectors::stack::Hook; -use reth_rpc_engine_api::EngineApi; -use reth_stages::{ - prelude::*, - stages::{ - AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, - IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, - TotalDifficultyStage, TransactionLookupStage, - }, -}; -use reth_tasks::TaskExecutor; -use reth_transaction_pool::{ - blobstore::InMemoryBlobStore, TransactionPool, TransactionValidationTaskExecutor, -}; -use secp256k1::SecretKey; -use std::{ - net::{SocketAddr, SocketAddrV4}, - path::PathBuf, - sync::Arc, -}; -use tokio::sync::{mpsc::unbounded_channel, oneshot, watch}; +use reth_auto_seal_consensus::AutoSealConsensus; +use reth_beacon_consensus::BeaconConsensus; +use reth_interfaces::consensus::Consensus; +use reth_primitives::ChainSpec; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tracing::*; pub mod cl_events; @@ -236,357 +167,59 @@ impl NodeCommand { } /// Execute `node` command - pub async fn execute(mut self, ctx: CliContext) -> eyre::Result<()> { + pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> { info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); - // Raise the fd limit of the process. - // Does not do anything on windows. - let _ = fdlimit::raise_fd_limit(); + let Self { + datadir, + config, + chain, + metrics, + trusted_setup_file, + instance, + network, + rpc, + txpool, + builder, + debug, + db, + dev, + pruning, + #[cfg(feature = "optimism")] + rollup, + ext, + } = self; - // get config - let config = self.load_config()?; + // set up real database + let database = DatabaseType::Real(datadir); - let prometheus_handle = self.install_prometheus_recorder()?; - - let data_dir = self.data_dir(); - let db_path = data_dir.db_path(); - - info!(target: "reth::cli", path = ?db_path, "Opening database"); - let db = Arc::new(init_db(&db_path, self.db.log_level)?.with_metrics()); - info!(target: "reth::cli", "Database opened"); - - let mut provider_factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)); - - // configure snapshotter - let snapshotter = reth_snapshot::Snapshotter::new( - provider_factory.clone(), - data_dir.snapshots_path(), - self.chain.snapshot_block_interval, - )?; - - provider_factory = provider_factory - .with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver())?; - - self.start_metrics_endpoint(prometheus_handle, Arc::clone(&db)).await?; - - debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); - - let genesis_hash = init_genesis(Arc::clone(&db), self.chain.clone())?; - - info!(target: "reth::cli", "{}", DisplayHardforks::new(self.chain.hardforks())); - - let consensus = self.consensus(); - - debug!(target: "reth::cli", "Spawning stages metrics listener task"); - let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel(); - let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx); - ctx.task_executor.spawn_critical("stages metrics listener task", sync_metrics_listener); - - let prune_config = - self.pruning.prune_config(Arc::clone(&self.chain))?.or(config.prune.clone()); - - // configure blockchain tree - let tree_externals = TreeExternals::new( - provider_factory.clone(), - Arc::clone(&consensus), - EvmProcessorFactory::new(self.chain.clone()), - ); - let tree_config = BlockchainTreeConfig::default(); - let tree = BlockchainTree::new( - tree_externals, - tree_config, - prune_config.clone().map(|config| config.segments), - )? - .with_sync_metrics_tx(sync_metrics_tx.clone()); - let canon_state_notification_sender = tree.canon_state_notification_sender(); - let blockchain_tree = ShareableBlockchainTree::new(tree); - debug!(target: "reth::cli", "configured blockchain tree"); - - // fetch the head block from the database - let head = - self.lookup_head(provider_factory.clone()).wrap_err("the head block is missing")?; - - // setup the blockchain provider - let blockchain_db = - BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?; - let blob_store = InMemoryBlobStore::default(); - let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain)) - .with_head_timestamp(head.timestamp) - .kzg_settings(self.kzg_settings()?) - .with_additional_tasks(1) - .build_with_tasks(blockchain_db.clone(), ctx.task_executor.clone(), blob_store.clone()); - - let transaction_pool = - reth_transaction_pool::Pool::eth_pool(validator, blob_store, self.txpool.pool_config()); - info!(target: "reth::cli", "Transaction pool initialized"); - - // spawn txpool maintenance task - { - let pool = transaction_pool.clone(); - let chain_events = blockchain_db.canonical_state_stream(); - let client = blockchain_db.clone(); - ctx.task_executor.spawn_critical( - "txpool maintenance task", - reth_transaction_pool::maintain::maintain_transaction_pool_future( - client, - pool, - chain_events, - ctx.task_executor.clone(), - Default::default(), - ), - ); - debug!(target: "reth::cli", "Spawned txpool maintenance task"); - } - - info!(target: "reth::cli", "Connecting to P2P network"); - let network_secret_path = - self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path()); - debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file"); - let secret_key = get_secret_key(&network_secret_path)?; - let default_peers_path = data_dir.known_peers_path(); - let network_config = self.load_network_config( - &config, - provider_factory.clone(), - ctx.task_executor.clone(), - head, - secret_key, - default_peers_path.clone(), - ); - - let network_client = network_config.client.clone(); - let mut network_builder = NetworkManager::builder(network_config).await?; - - let components = RethNodeComponentsImpl { - provider: blockchain_db.clone(), - pool: transaction_pool.clone(), - network: network_builder.handle(), - task_executor: ctx.task_executor.clone(), - events: blockchain_db.clone(), + // set up node config + let node_config = NodeBuilder { + database, + config, + chain, + metrics, + instance, + trusted_setup_file, + network, + rpc, + txpool, + builder, + debug, + db, + dev, + pruning, + #[cfg(feature = "optimism")] + rollup, }; - // allow network modifications - self.ext.configure_network(network_builder.network_mut(), &components)?; + let executor = ctx.task_executor; - // launch network - let network = self.start_network( - network_builder, - &ctx.task_executor, - transaction_pool.clone(), - network_client, - default_peers_path, - ); + // launch the node + let handle = node_config.launch::(ext, executor).await?; - info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), enode = %network.local_node_record(), "Connected to P2P network"); - debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); - let network_client = network.fetch_client().await?; - - self.ext.on_components_initialized(&components)?; - - debug!(target: "reth::cli", "Spawning payload builder service"); - let payload_builder = self.ext.spawn_payload_builder_service(&self.builder, &components)?; - - let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); - let max_block = if let Some(block) = self.debug.max_block { - Some(block) - } else if let Some(tip) = self.debug.tip { - Some(self.lookup_or_fetch_tip(provider_factory.clone(), &network_client, tip).await?) - } else { - None - }; - - // Configure the pipeline - let (mut pipeline, client) = if self.dev.dev { - info!(target: "reth::cli", "Starting Reth in dev mode"); - - let mining_mode = if let Some(interval) = self.dev.block_time { - MiningMode::interval(interval) - } else if let Some(max_transactions) = self.dev.block_max_transactions { - MiningMode::instant( - max_transactions, - transaction_pool.pending_transactions_listener(), - ) - } else { - info!(target: "reth::cli", "No mining mode specified, defaulting to ReadyTransaction"); - MiningMode::instant(1, transaction_pool.pending_transactions_listener()) - }; - - let (_, client, mut task) = AutoSealBuilder::new( - Arc::clone(&self.chain), - blockchain_db.clone(), - transaction_pool.clone(), - consensus_engine_tx.clone(), - canon_state_notification_sender, - mining_mode, - ) - .build(); - - let mut pipeline = self - .build_networked_pipeline( - &config.stages, - client.clone(), - Arc::clone(&consensus), - provider_factory.clone(), - &ctx.task_executor, - sync_metrics_tx, - prune_config.clone(), - max_block, - ) - .await?; - - let pipeline_events = pipeline.events(); - task.set_pipeline_events(pipeline_events); - debug!(target: "reth::cli", "Spawning auto mine task"); - ctx.task_executor.spawn(Box::pin(task)); - - (pipeline, EitherDownloader::Left(client)) - } else { - let pipeline = self - .build_networked_pipeline( - &config.stages, - network_client.clone(), - Arc::clone(&consensus), - provider_factory.clone(), - &ctx.task_executor, - sync_metrics_tx, - prune_config.clone(), - max_block, - ) - .await?; - - (pipeline, EitherDownloader::Right(network_client)) - }; - - let pipeline_events = pipeline.events(); - - let initial_target = if let Some(tip) = self.debug.tip { - // Set the provided tip as the initial pipeline target. - debug!(target: "reth::cli", %tip, "Tip manually set"); - Some(tip) - } else if self.debug.continuous { - // Set genesis as the initial pipeline target. - // This will allow the downloader to start - debug!(target: "reth::cli", "Continuous sync mode enabled"); - Some(genesis_hash) - } else { - None - }; - - let mut hooks = EngineHooks::new(); - - let pruner_events = if let Some(prune_config) = prune_config { - let mut pruner = PrunerBuilder::new(prune_config.clone()) - .max_reorg_depth(tree_config.max_reorg_depth() as usize) - .prune_delete_limit(self.chain.prune_delete_limit) - .build(provider_factory, snapshotter.highest_snapshot_receiver()); - - let events = pruner.events(); - hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone()))); - - info!(target: "reth::cli", ?prune_config, "Pruner initialized"); - Either::Left(events) - } else { - Either::Right(stream::empty()) - }; - - // Configure the consensus engine - let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( - client, - pipeline, - blockchain_db.clone(), - Box::new(ctx.task_executor.clone()), - Box::new(network.clone()), - max_block, - self.debug.continuous, - payload_builder.clone(), - initial_target, - MIN_BLOCKS_FOR_PIPELINE_RUN, - consensus_engine_tx, - consensus_engine_rx, - hooks, - )?; - info!(target: "reth::cli", "Consensus engine initialized"); - - let events = stream_select!( - network.event_listener().map(Into::into), - beacon_engine_handle.event_listener().map(Into::into), - pipeline_events.map(Into::into), - if self.debug.tip.is_none() { - Either::Left( - ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone())) - .map(Into::into), - ) - } else { - Either::Right(stream::empty()) - }, - pruner_events.map(Into::into) - ); - ctx.task_executor.spawn_critical( - "events task", - events::handle_events(Some(network.clone()), Some(head.number), events, db.clone()), - ); - - let engine_api = EngineApi::new( - blockchain_db.clone(), - self.chain.clone(), - beacon_engine_handle, - payload_builder.into(), - Box::new(ctx.task_executor.clone()), - ); - info!(target: "reth::cli", "Engine API handler initialized"); - - // extract the jwt secret from the args if possible - let default_jwt_path = data_dir.jwt_path(); - let jwt_secret = self.rpc.auth_jwt_secret(default_jwt_path)?; - - // adjust rpc port numbers based on instance number - self.adjust_instance_ports(); - - // Start RPC servers - let _rpc_server_handles = - self.rpc.start_servers(&components, engine_api, jwt_secret, &mut self.ext).await?; - - // Run consensus engine to completion - let (tx, rx) = oneshot::channel(); - info!(target: "reth::cli", "Starting consensus engine"); - ctx.task_executor.spawn_critical_blocking("consensus engine", async move { - let res = beacon_consensus_engine.await; - let _ = tx.send(res); - }); - - self.ext.on_node_started(&components)?; - - // If `enable_genesis_walkback` is set to true, the rollup client will need to - // perform the derivation pipeline from genesis, validating the data dir. - // When set to false, set the finalized, safe, and unsafe head block hashes - // on the rollup client using a fork choice update. This prevents the rollup - // client from performing the derivation pipeline from genesis, and instead - // starts syncing from the current tip in the DB. - #[cfg(feature = "optimism")] - if self.chain.is_optimism() && !self.rollup.enable_genesis_walkback { - let client = _rpc_server_handles.auth.http_client(); - reth_rpc_api::EngineApiClient::fork_choice_updated_v2( - &client, - reth_rpc_types::engine::ForkchoiceState { - head_block_hash: head.hash, - safe_block_hash: head.hash, - finalized_block_hash: head.hash, - }, - None, - ) - .await?; - } - - rx.await??; - - info!(target: "reth::cli", "Consensus engine has exited."); - - if self.debug.terminate { - Ok(()) - } else { - // The pipeline has finished downloading blocks up to `--debug.tip` or - // `--debug.max-block`. Keep other node components alive for further usage. - futures::future::pending().await - } + // wait for node exit + handle.wait_for_node_exit().await } /// Returns the [Consensus] instance to use. @@ -600,425 +233,6 @@ impl NodeCommand { Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))) } } - - /// Constructs a [Pipeline] that's wired to the network - #[allow(clippy::too_many_arguments)] - async fn build_networked_pipeline( - &self, - config: &StageConfig, - client: Client, - consensus: Arc, - provider_factory: ProviderFactory, - task_executor: &TaskExecutor, - metrics_tx: reth_stages::MetricEventsSender, - prune_config: Option, - max_block: Option, - ) -> eyre::Result> - where - DB: Database + Unpin + Clone + 'static, - Client: HeadersClient + BodiesClient + Clone + 'static, - { - // building network downloaders using the fetch client - let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers) - .build(client.clone(), Arc::clone(&consensus)) - .into_task_with(task_executor); - - let body_downloader = BodiesDownloaderBuilder::new(config.bodies) - .build(client, Arc::clone(&consensus), provider_factory.clone()) - .into_task_with(task_executor); - - let pipeline = self - .build_pipeline( - provider_factory, - config, - header_downloader, - body_downloader, - consensus, - max_block, - self.debug.continuous, - metrics_tx, - prune_config, - ) - .await?; - - Ok(pipeline) - } - - /// Returns the chain specific path to the data dir. - fn data_dir(&self) -> ChainPath { - self.datadir.unwrap_or_chain_default(self.chain.chain) - } - - /// Returns the path to the config file. - fn config_path(&self) -> PathBuf { - self.config.clone().unwrap_or_else(|| self.data_dir().config_path()) - } - - /// Loads the reth config with the given datadir root - fn load_config(&self) -> eyre::Result { - let config_path = self.config_path(); - let mut config = confy::load_path::(&config_path) - .wrap_err_with(|| format!("Could not load config file {:?}", config_path))?; - - info!(target: "reth::cli", path = ?config_path, "Configuration loaded"); - - // Update the config with the command line arguments - config.peers.connect_trusted_nodes_only = self.network.trusted_only; - - if !self.network.trusted_peers.is_empty() { - info!(target: "reth::cli", "Adding trusted nodes"); - self.network.trusted_peers.iter().for_each(|peer| { - config.peers.trusted_nodes.insert(*peer); - }); - } - - Ok(config) - } - - /// Loads the trusted setup params from a given file path or falls back to - /// `MAINNET_KZG_TRUSTED_SETUP`. - fn kzg_settings(&self) -> eyre::Result> { - if let Some(ref trusted_setup_file) = self.trusted_setup_file { - let trusted_setup = KzgSettings::load_trusted_setup_file(trusted_setup_file) - .map_err(LoadKzgSettingsError::KzgError)?; - Ok(Arc::new(trusted_setup)) - } else { - Ok(Arc::clone(&MAINNET_KZG_TRUSTED_SETUP)) - } - } - - fn install_prometheus_recorder(&self) -> eyre::Result { - prometheus_exporter::install_recorder() - } - - async fn start_metrics_endpoint( - &self, - prometheus_handle: PrometheusHandle, - db: Metrics, - ) -> eyre::Result<()> - where - Metrics: DatabaseMetrics + 'static + Send + Sync, - { - if let Some(listen_addr) = self.metrics { - info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); - prometheus_exporter::serve( - listen_addr, - prometheus_handle, - db, - metrics_process::Collector::default(), - ) - .await?; - } - - Ok(()) - } - - /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected - /// to that network. - fn start_network( - &self, - builder: NetworkBuilder, - task_executor: &TaskExecutor, - pool: Pool, - client: C, - default_peers_path: PathBuf, - ) -> NetworkHandle - where - C: BlockReader + HeaderProvider + Clone + Unpin + 'static, - Pool: TransactionPool + Unpin + 'static, - { - let (handle, network, txpool, eth) = - builder.transactions(pool).request_handler(client).split_with_handle(); - - task_executor.spawn_critical("p2p txpool", txpool); - task_executor.spawn_critical("p2p eth request handler", eth); - - let known_peers_file = self.network.persistent_peers_file(default_peers_path); - task_executor - .spawn_critical_with_graceful_shutdown_signal("p2p network task", |shutdown| { - run_network_until_shutdown(shutdown, network, known_peers_file) - }); - - handle - } - - /// Fetches the head block from the database. - /// - /// If the database is empty, returns the genesis block. - fn lookup_head(&self, factory: ProviderFactory) -> RethResult { - let provider = factory.provider()?; - - let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number; - - let header = provider - .header_by_number(head)? - .expect("the header for the latest block is missing, database is corrupt"); - - let total_difficulty = provider - .header_td_by_number(head)? - .expect("the total difficulty for the latest block is missing, database is corrupt"); - - let hash = provider - .block_hash(head)? - .expect("the hash for the latest block is missing, database is corrupt"); - - Ok(Head { - number: head, - hash, - difficulty: header.difficulty, - total_difficulty, - timestamp: header.timestamp, - }) - } - - /// Attempt to look up the block number for the tip hash in the database. - /// If it doesn't exist, download the header and return the block number. - /// - /// NOTE: The download is attempted with infinite retries. - async fn lookup_or_fetch_tip( - &self, - provider_factory: ProviderFactory, - client: Client, - tip: B256, - ) -> RethResult - where - DB: Database, - Client: HeadersClient, - { - Ok(self.fetch_tip(provider_factory, client, BlockHashOrNumber::Hash(tip)).await?.number) - } - - /// Attempt to look up the block with the given number and return the header. - /// - /// NOTE: The download is attempted with infinite retries. - async fn fetch_tip( - &self, - factory: ProviderFactory, - client: Client, - tip: BlockHashOrNumber, - ) -> RethResult - where - DB: Database, - Client: HeadersClient, - { - let provider = factory.provider()?; - - let header = provider.header_by_hash_or_number(tip)?; - - // try to look up the header in the database - if let Some(header) = header { - info!(target: "reth::cli", ?tip, "Successfully looked up tip block in the database"); - return Ok(header.seal_slow()) - } - - info!(target: "reth::cli", ?tip, "Fetching tip block from the network."); - loop { - match get_single_header(&client, tip).await { - Ok(tip_header) => { - info!(target: "reth::cli", ?tip, "Successfully fetched tip"); - return Ok(tip_header) - } - Err(error) => { - error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying..."); - } - } - } - } - - fn load_network_config( - &self, - config: &Config, - provider_factory: ProviderFactory, - executor: TaskExecutor, - head: Head, - secret_key: SecretKey, - default_peers_path: PathBuf, - ) -> NetworkConfig> { - let cfg_builder = self - .network - .network_config(config, self.chain.clone(), secret_key, default_peers_path) - .with_task_executor(Box::new(executor)) - .set_head(head) - .listener_addr(SocketAddr::V4(SocketAddrV4::new( - self.network.addr, - // set discovery port based on instance number - self.network.port + self.instance - 1, - ))) - .discovery_addr(SocketAddr::V4(SocketAddrV4::new( - self.network.addr, - // set discovery port based on instance number - self.network.port + self.instance - 1, - ))); - - // When `sequencer_endpoint` is configured, the node will forward all transactions to a - // Sequencer node for execution and inclusion on L1, and disable its own txpool - // gossip to prevent other parties in the network from learning about them. - #[cfg(feature = "optimism")] - let cfg_builder = cfg_builder - .sequencer_endpoint(self.rollup.sequencer_http.clone()) - .disable_tx_gossip(self.rollup.disable_txpool_gossip); - - cfg_builder.build(provider_factory) - } - - #[allow(clippy::too_many_arguments)] - async fn build_pipeline( - &self, - provider_factory: ProviderFactory, - config: &StageConfig, - header_downloader: H, - body_downloader: B, - consensus: Arc, - max_block: Option, - continuous: bool, - metrics_tx: reth_stages::MetricEventsSender, - prune_config: Option, - ) -> eyre::Result> - where - DB: Database + Clone + 'static, - H: HeaderDownloader + 'static, - B: BodyDownloader + 'static, - { - let mut builder = Pipeline::builder(); - - if let Some(max_block) = max_block { - debug!(target: "reth::cli", max_block, "Configuring builder to use max block"); - builder = builder.with_max_block(max_block) - } - - let (tip_tx, tip_rx) = watch::channel(B256::ZERO); - use reth_revm_inspectors::stack::InspectorStackConfig; - let factory = reth_revm::EvmProcessorFactory::new(self.chain.clone()); - - let stack_config = InspectorStackConfig { - use_printer_tracer: self.debug.print_inspector, - hook: if let Some(hook_block) = self.debug.hook_block { - Hook::Block(hook_block) - } else if let Some(tx) = self.debug.hook_transaction { - Hook::Transaction(tx) - } else if self.debug.hook_all { - Hook::All - } else { - Hook::None - }, - }; - - let factory = factory.with_stack_config(stack_config); - - let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default(); - - let header_mode = - if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; - let pipeline = builder - .with_tip_sender(tip_tx) - .with_metrics_tx(metrics_tx.clone()) - .add_stages( - DefaultStages::new( - provider_factory.clone(), - header_mode, - Arc::clone(&consensus), - header_downloader, - body_downloader, - factory.clone(), - ) - .set( - TotalDifficultyStage::new(consensus) - .with_commit_threshold(config.total_difficulty.commit_threshold), - ) - .set(SenderRecoveryStage { - commit_threshold: config.sender_recovery.commit_threshold, - }) - .set( - ExecutionStage::new( - factory, - ExecutionStageThresholds { - max_blocks: config.execution.max_blocks, - max_changes: config.execution.max_changes, - max_cumulative_gas: config.execution.max_cumulative_gas, - }, - config - .merkle - .clean_threshold - .max(config.account_hashing.clean_threshold) - .max(config.storage_hashing.clean_threshold), - prune_modes.clone(), - ) - .with_metrics_tx(metrics_tx), - ) - .set(AccountHashingStage::new( - config.account_hashing.clean_threshold, - config.account_hashing.commit_threshold, - )) - .set(StorageHashingStage::new( - config.storage_hashing.clean_threshold, - config.storage_hashing.commit_threshold, - )) - .set(MerkleStage::new_execution(config.merkle.clean_threshold)) - .set(TransactionLookupStage::new( - config.transaction_lookup.commit_threshold, - prune_modes.transaction_lookup, - )) - .set(IndexAccountHistoryStage::new( - config.index_account_history.commit_threshold, - prune_modes.account_history, - )) - .set(IndexStorageHistoryStage::new( - config.index_storage_history.commit_threshold, - prune_modes.storage_history, - )), - ) - .build(provider_factory); - - Ok(pipeline) - } - - /// Change rpc port numbers based on the instance number. - fn adjust_instance_ports(&mut self) { - // auth port is scaled by a factor of instance * 100 - self.rpc.auth_port += self.instance * 100 - 100; - // http port is scaled by a factor of -instance - self.rpc.http_port -= self.instance - 1; - // ws port is scaled by a factor of instance * 2 - self.rpc.ws_port += self.instance * 2 - 2; - } -} - -/// Drives the [NetworkManager] future until a [Shutdown](reth_tasks::shutdown::Shutdown) signal is -/// received. If configured, this writes known peers to `persistent_peers_file` afterwards. -async fn run_network_until_shutdown( - shutdown: reth_tasks::shutdown::GracefulShutdown, - network: NetworkManager, - persistent_peers_file: Option, -) where - C: BlockReader + HeaderProvider + Clone + Unpin + 'static, -{ - pin_mut!(network, shutdown); - - let mut graceful_guard = None; - tokio::select! { - _ = &mut network => {}, - guard = shutdown => { - graceful_guard = Some(guard); - }, - } - - if let Some(file_path) = persistent_peers_file { - let known_peers = network.all_peers().collect::>(); - if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) { - trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers"); - let parent_dir = file_path.parent().map(fs::create_dir_all).transpose(); - match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) { - Ok(_) => { - info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file"); - } - Err(err) => { - warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file"); - } - } - } - } - - drop(graceful_guard) } #[cfg(test)] @@ -1149,7 +363,7 @@ mod tests { #[test] fn parse_instance() { let mut cmd = NodeCommand::<()>::parse_from(["reth"]); - cmd.adjust_instance_ports(); + cmd.rpc.adjust_instance_ports(cmd.instance); cmd.network.port = DEFAULT_DISCOVERY_PORT + cmd.instance - 1; // check rpc port numbers assert_eq!(cmd.rpc.auth_port, 8551); @@ -1159,7 +373,7 @@ mod tests { assert_eq!(cmd.network.port, 30303); let mut cmd = NodeCommand::<()>::parse_from(["reth", "--instance", "2"]); - cmd.adjust_instance_ports(); + cmd.rpc.adjust_instance_ports(cmd.instance); cmd.network.port = DEFAULT_DISCOVERY_PORT + cmd.instance - 1; // check rpc port numbers assert_eq!(cmd.rpc.auth_port, 8651); @@ -1169,7 +383,7 @@ mod tests { assert_eq!(cmd.network.port, 30304); let mut cmd = NodeCommand::<()>::parse_from(["reth", "--instance", "3"]); - cmd.adjust_instance_ports(); + cmd.rpc.adjust_instance_ports(cmd.instance); cmd.network.port = DEFAULT_DISCOVERY_PORT + cmd.instance - 1; // check rpc port numbers assert_eq!(cmd.rpc.auth_port, 8751); diff --git a/bin/reth/src/utils.rs b/bin/reth/src/utils.rs index 74af5288c..38a824fb9 100644 --- a/bin/reth/src/utils.rs +++ b/bin/reth/src/utils.rs @@ -15,9 +15,11 @@ use reth_interfaces::p2p::{ headers::client::{HeadersClient, HeadersRequest}, priority::Priority, }; +use reth_network::NetworkManager; use reth_primitives::{ fs, BlockHashOrNumber, ChainSpec, HeadersDirection, SealedBlock, SealedHeader, }; +use reth_provider::BlockReader; use reth_rpc::{JwtError, JwtSecret}; use std::{ env::VarError, @@ -25,7 +27,7 @@ use std::{ rc::Rc, sync::Arc, }; -use tracing::{debug, info}; +use tracing::{debug, info, trace, warn}; /// Exposing `open_db_read_only` function pub mod db { @@ -255,8 +257,8 @@ impl ListFilter { self.len = len; } } -/// Attempts to retrieve or create a JWT secret from the specified path. +/// Attempts to retrieve or create a JWT secret from the specified path. pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result { if path.exists() { debug!(target: "reth::cli", ?path, "Reading JWT auth secret file"); @@ -266,3 +268,26 @@ pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result(network: &NetworkManager, persistent_peers_file: Option) +where + C: BlockReader + Unpin, +{ + if let Some(file_path) = persistent_peers_file { + let known_peers = network.all_peers().collect::>(); + if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) { + trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers"); + let parent_dir = file_path.parent().map(fs::create_dir_all).transpose(); + match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) { + Ok(_) => { + info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file"); + } + Err(err) => { + warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file"); + } + } + } + } +} diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 88cb8bdcb..456b542e3 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -33,7 +33,7 @@ use crate::{ transactions::NetworkTransactionEvent, FetchClient, NetworkBuilder, }; -use futures::{Future, StreamExt}; +use futures::{pin_mut, Future, StreamExt}; use parking_lot::Mutex; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, @@ -45,6 +45,7 @@ use reth_network_api::ReputationChangeKind; use reth_primitives::{ForkId, NodeRecord, PeerId, B256}; use reth_provider::{BlockNumReader, BlockReader}; use reth_rpc_types::{EthProtocolInfo, NetworkStatus}; +use reth_tasks::shutdown::GracefulShutdown; use reth_tokio_util::EventListeners; use secp256k1::SecretKey; use std::{ @@ -596,6 +597,34 @@ where } } +impl NetworkManager +where + C: BlockReader + Unpin, +{ + /// Drives the [NetworkManager] future until a [GracefulShutdown] signal is received. + /// + /// This also run the given function `shutdown_hook` afterwards. + pub async fn run_until_graceful_shutdown( + self, + shutdown: GracefulShutdown, + shutdown_hook: impl FnOnce(&mut Self), + ) { + let network = self; + pin_mut!(network, shutdown); + + let mut graceful_guard = None; + tokio::select! { + _ = &mut network => {}, + guard = shutdown => { + graceful_guard = Some(guard); + }, + } + + shutdown_hook(&mut network); + drop(graceful_guard); + } +} + impl Future for NetworkManager where C: BlockReader + Unpin, diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index 16ef51b4b..a57779d4f 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -226,9 +226,15 @@ pub mod test_utils { } } + /// Get a temporary directory path to use for the database + pub fn tempdir_path() -> PathBuf { + let builder = tempfile::Builder::new().prefix("reth-test-").rand_bytes(8).tempdir(); + builder.expect(ERROR_TEMPDIR).into_path() + } + /// Create read/write database for testing pub fn create_test_rw_db() -> Arc> { - let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(); + let path = tempdir_path(); let emsg = format!("{}: {:?}", ERROR_DB_CREATION, path); let db = init_db(&path, None).expect(&emsg); @@ -245,7 +251,7 @@ pub mod test_utils { /// Create read only database for testing pub fn create_test_ro_db() -> Arc> { - let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(); + let path = tempdir_path(); { init_db(path.as_path(), None).expect(ERROR_DB_CREATION); } diff --git a/crates/tracing/src/lib.rs b/crates/tracing/src/lib.rs index 61481013f..2fc551a5f 100644 --- a/crates/tracing/src/lib.rs +++ b/crates/tracing/src/lib.rs @@ -33,7 +33,8 @@ pub type BoxedLayer = Box + Send + Sync>; /// Initializes a new [Subscriber] based on the given layers. pub fn init(layers: Vec>) { - tracing_subscriber::registry().with(layers).init(); + // To avoid panicking in tests, we silently fail if we cannot initialize the subscriber. + let _ = tracing_subscriber::registry().with(layers).try_init(); } /// Builds a new tracing layer that writes to stdout.