From 3aa718a56141238dcdea8e61be1d1cd1c08d93b6 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 27 Dec 2023 14:23:32 +0100 Subject: [PATCH] Revert "feat: Introduce NodeBuilder" (#5868) --- Cargo.lock | 1 - bin/reth/Cargo.toml | 1 - bin/reth/src/args/rpc_server_args.rs | 25 - bin/reth/src/cli/db_type.rs | 108 -- bin/reth/src/cli/mod.rs | 2 - bin/reth/src/cli/node_builder.rs | 1354 -------------------------- bin/reth/src/dirs.rs | 5 - bin/reth/src/node/mod.rs | 896 +++++++++++++++-- bin/reth/src/utils.rs | 29 +- crates/net/network/src/manager.rs | 31 +- crates/storage/db/src/lib.rs | 10 +- crates/tracing/src/lib.rs | 3 +- 12 files changed, 847 insertions(+), 1618 deletions(-) delete mode 100644 bin/reth/src/cli/db_type.rs delete mode 100644 bin/reth/src/cli/node_builder.rs diff --git a/Cargo.lock b/Cargo.lock index c5482ac9f..8abc5ca87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5608,7 +5608,6 @@ dependencies = [ "metrics-exporter-prometheus", "metrics-process", "metrics-util", - "once_cell", "pin-project", "pretty_assertions", "procfs", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index d1747b069..966f0cd0b 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -74,7 +74,6 @@ metrics-util = "0.15.0" metrics-process = "1.0.9" reth-metrics.workspace = true metrics.workspace = true -once_cell.workspace = true # test vectors generation proptest.workspace = true diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 4a6e47c41..54f0f040f 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -179,31 +179,6 @@ pub struct RpcServerArgs { } impl RpcServerArgs { - /// Enables the HTTP-RPC server. - pub fn with_http(mut self) -> Self { - self.http = true; - self - } - - /// Enables the WS-RPC server. - pub fn with_ws(mut self) -> Self { - self.ws = true; - self - } - - /// Change rpc port numbers based on the instance number. - /// - /// Warning: if `instance` is zero, this will panic. - pub fn adjust_instance_ports(&mut self, instance: u16) { - debug_assert_ne!(instance, 0, "instance must be non-zero"); - // auth port is scaled by a factor of instance * 100 - self.auth_port += instance * 100 - 100; - // http port is scaled by a factor of -instance - self.http_port -= instance - 1; - // ws port is scaled by a factor of instance * 2 - self.ws_port += instance * 2 - 2; - } - /// Configures and launches _all_ servers. /// /// Returns the handles for the launched regular RPC server(s) (if any) and the server handle diff --git a/bin/reth/src/cli/db_type.rs b/bin/reth/src/cli/db_type.rs deleted file mode 100644 index a852f88c2..000000000 --- a/bin/reth/src/cli/db_type.rs +++ /dev/null @@ -1,108 +0,0 @@ -//! A real or test database type - -use crate::dirs::{ChainPath, DataDirPath, MaybePlatformPath}; -use reth_db::{ - init_db, - test_utils::{create_test_rw_db, TempDatabase}, - DatabaseEnv, -}; -use reth_interfaces::db::LogLevel; -use reth_primitives::Chain; -use std::{str::FromStr, sync::Arc}; - -/// A type that represents either a _real_ (represented by a path), or _test_ database, which will -/// use a [TempDatabase]. -#[derive(Debug)] -pub enum DatabaseType { - /// The real database type - Real(MaybePlatformPath), - /// The test database type - Test, -} - -/// The [Default] implementation for [DatabaseType] uses the _real_ variant, using the default -/// value for the inner [MaybePlatformPath]. -impl Default for DatabaseType { - fn default() -> Self { - Self::Real(MaybePlatformPath::::default()) - } -} - -impl DatabaseType { - /// Creates a _test_ database - pub fn test() -> Self { - Self::Test - } -} - -/// Type that represents a [DatabaseType] and [LogLevel], used to build a database type -pub struct DatabaseBuilder { - /// The database type - db_type: DatabaseType, -} - -impl DatabaseBuilder { - /// Creates the [DatabaseBuilder] with the given [DatabaseType] - pub fn new(db_type: DatabaseType) -> Self { - Self { db_type } - } - - /// Initializes and returns the [DatabaseInstance] depending on the current database type. If - /// the [DatabaseType] is test, the [LogLevel] is not used. - /// - /// If the [DatabaseType] is test, then the [ChainPath] constructed will be derived from the db - /// path of the [TempDatabase] and the given chain. - pub fn build_db( - self, - log_level: Option, - chain: Chain, - ) -> eyre::Result { - match self.db_type { - DatabaseType::Test => { - let db = create_test_rw_db(); - let db_path_str = db.path().to_str().expect("Path is not valid unicode"); - let path = MaybePlatformPath::::from_str(db_path_str) - .expect("Path is not valid"); - let data_dir = path.unwrap_or_chain_default(chain); - - Ok(DatabaseInstance::Test { db, data_dir }) - } - DatabaseType::Real(path) => { - let data_dir = path.unwrap_or_chain_default(chain); - - tracing::info!(target: "reth::cli", path = ?data_dir, "Opening database"); - let db = Arc::new(init_db(data_dir.clone(), log_level)?); - Ok(DatabaseInstance::Real { db, data_dir }) - } - } - } -} - -/// A constructed database type, with a [ChainPath]. -#[derive(Debug, Clone)] -pub enum DatabaseInstance { - /// The test database - Test { - /// The database - db: Arc>, - /// The data dir - data_dir: ChainPath, - }, - /// The real database - Real { - /// The database - db: Arc, - /// The data dir - data_dir: ChainPath, - }, -} - -impl DatabaseInstance { - /// Returns the data dir for this database instance - pub fn data_dir(&self) -> &ChainPath { - match self { - Self::Test { data_dir, .. } => data_dir, - Self::Real { data_dir, .. } => data_dir, - } - } -} diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index 0cd519433..14f25a279 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -21,9 +21,7 @@ use std::{fmt, fmt::Display, sync::Arc}; pub mod components; pub mod config; -pub mod db_type; pub mod ext; -pub mod node_builder; /// Default [directives](Directive) for [EnvFilter] which disables high-frequency debug logs from /// `hyper` and `trust-dns` diff --git a/bin/reth/src/cli/node_builder.rs b/bin/reth/src/cli/node_builder.rs deleted file mode 100644 index f72a863b1..000000000 --- a/bin/reth/src/cli/node_builder.rs +++ /dev/null @@ -1,1354 +0,0 @@ -//! Support for customizing the node -use super::{ - components::RethRpcServerHandles, db_type::DatabaseType, ext::DefaultRethNodeCommandConfig, -}; -use crate::{ - args::{ - get_secret_key, DatabaseArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs, - PruningArgs, RpcServerArgs, TxPoolArgs, - }, - cli::{ - components::RethNodeComponentsImpl, - config::{RethRpcConfig, RethTransactionPoolConfig}, - db_type::{DatabaseBuilder, DatabaseInstance}, - ext::{RethCliExt, RethNodeCommandConfig}, - }, - dirs::{ChainPath, DataDirPath, MaybePlatformPath}, - init::init_genesis, - node::{cl_events::ConsensusLayerHealthEvents, events}, - prometheus_exporter, - utils::{get_single_header, write_peers_to_file}, - version::SHORT_VERSION, -}; -use eyre::Context; -use fdlimit::raise_fd_limit; -use futures::{future::Either, stream, stream_select, StreamExt}; -use metrics_exporter_prometheus::PrometheusHandle; -use once_cell::sync::Lazy; -use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode}; -use reth_beacon_consensus::{ - hooks::{EngineHooks, PruneHook}, - BeaconConsensus, BeaconConsensusEngine, BeaconConsensusEngineError, - MIN_BLOCKS_FOR_PIPELINE_RUN, -}; -use reth_blockchain_tree::{ - config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, -}; -use reth_config::{ - config::{PruneConfig, StageConfig}, - Config, -}; -use reth_db::{ - database::Database, - database_metrics::{DatabaseMetadata, DatabaseMetrics}, -}; -use reth_downloaders::{ - bodies::bodies::BodiesDownloaderBuilder, - headers::reverse_headers::ReverseHeadersDownloaderBuilder, -}; -use reth_interfaces::{ - blockchain_tree::BlockchainTreeEngine, - consensus::Consensus, - p2p::{ - bodies::{client::BodiesClient, downloader::BodyDownloader}, - either::EitherDownloader, - headers::{client::HeadersClient, downloader::HeaderDownloader}, - }, - RethResult, -}; -use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager}; -use reth_network_api::{NetworkInfo, PeersInfo}; -use reth_primitives::{ - constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP}, - kzg::KzgSettings, - stage::StageId, - BlockHashOrNumber, BlockNumber, ChainSpec, DisplayHardforks, Head, SealedHeader, TxHash, B256, - MAINNET, -}; -use reth_provider::{ - providers::BlockchainProvider, BlockHashReader, BlockReader, - BlockchainTreePendingStateProvider, CanonStateSubscriptions, HeaderProvider, HeaderSyncMode, - ProviderFactory, StageCheckpointReader, -}; -use reth_prune::PrunerBuilder; -use reth_revm::EvmProcessorFactory; -use reth_revm_inspectors::stack::Hook; -use reth_rpc_engine_api::EngineApi; -use reth_stages::{ - prelude::*, - stages::{ - AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, - IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, - TotalDifficultyStage, TransactionLookupStage, - }, - MetricEvent, -}; -use reth_tasks::{TaskExecutor, TaskManager}; -use reth_transaction_pool::{ - blobstore::InMemoryBlobStore, EthTransactionPool, TransactionPool, - TransactionValidationTaskExecutor, -}; -use secp256k1::SecretKey; -use std::{ - net::{SocketAddr, SocketAddrV4}, - path::PathBuf, - sync::Arc, -}; -use tokio::{ - runtime::Handle, - sync::{ - mpsc::{unbounded_channel, Receiver, UnboundedSender}, - oneshot, watch, - }, -}; -use tracing::*; - -/// The default prometheus recorder handle. We use a global static to ensure that it is only -/// installed once. -pub static PROMETHEUS_RECORDER_HANDLE: Lazy = - Lazy::new(|| prometheus_exporter::install_recorder().unwrap()); - -/// Start the node -#[derive(Debug)] -pub struct NodeBuilder { - /// The test database - pub database: DatabaseType, - - /// The path to the configuration file to use. - pub config: Option, - - /// The chain this node is running. - /// - /// Possible values are either a built-in chain or the path to a chain specification file. - pub chain: Arc, - - /// Enable Prometheus metrics. - /// - /// The metrics will be served at the given interface and port. - pub metrics: Option, - - /// Add a new instance of a node. - /// - /// Configures the ports of the node to avoid conflicts with the defaults. - /// This is useful for running multiple nodes on the same machine. - /// - /// Max number of instances is 200. It is chosen in a way so that it's not possible to have - /// port numbers that conflict with each other. - /// - /// Changes to the following port numbers: - /// - DISCOVERY_PORT: default + `instance` - 1 - /// - AUTH_PORT: default + `instance` * 100 - 100 - /// - HTTP_RPC_PORT: default - `instance` + 1 - /// - WS_RPC_PORT: default + `instance` * 2 - 2 - pub instance: u16, - - /// Overrides the KZG trusted setup by reading from the supplied file. - pub trusted_setup_file: Option, - - /// All networking related arguments - pub network: NetworkArgs, - - /// All rpc related arguments - pub rpc: RpcServerArgs, - - /// All txpool related arguments with --txpool prefix - pub txpool: TxPoolArgs, - - /// All payload builder related arguments - pub builder: PayloadBuilderArgs, - - /// All debug related arguments with --debug prefix - pub debug: DebugArgs, - - /// All database related arguments - pub db: DatabaseArgs, - - /// All dev related arguments with --dev prefix - pub dev: DevArgs, - - /// All pruning related arguments - pub pruning: PruningArgs, - - /// Rollup related arguments - #[cfg(feature = "optimism")] - pub rollup: crate::args::RollupArgs, -} - -impl NodeBuilder { - /// Creates a testing [NodeBuilder], causing the database to be launched ephemerally. - pub fn test() -> Self { - Self { - database: DatabaseType::test(), - config: None, - chain: MAINNET.clone(), - metrics: None, - instance: 1, - trusted_setup_file: None, - network: NetworkArgs::default(), - rpc: RpcServerArgs::default(), - txpool: TxPoolArgs::default(), - builder: PayloadBuilderArgs::default(), - debug: DebugArgs::default(), - db: DatabaseArgs::default(), - dev: DevArgs::default(), - pruning: PruningArgs::default(), - #[cfg(feature = "optimism")] - rollup: crate::args::RollupArgs::default(), - } - } - - /// Set the datadir for the node - pub fn with_datadir(mut self, datadir: MaybePlatformPath) -> Self { - self.database = DatabaseType::Real(datadir); - self - } - - /// Set the config file for the node - pub fn with_config(mut self, config: impl Into) -> Self { - self.config = Some(config.into()); - self - } - - /// Set the chain for the node - pub fn with_chain(mut self, chain: Arc) -> Self { - self.chain = chain; - self - } - - /// Set the metrics address for the node - pub fn with_metrics(mut self, metrics: SocketAddr) -> Self { - self.metrics = Some(metrics); - self - } - - /// Set the instance for the node - pub fn with_instance(mut self, instance: u16) -> Self { - self.instance = instance; - self - } - - /// Set the [ChainSpec] for the node - pub fn with_chain_spec(mut self, chain: Arc) -> Self { - self.chain = chain; - self - } - - /// Set the trusted setup file for the node - pub fn with_trusted_setup_file(mut self, trusted_setup_file: impl Into) -> Self { - self.trusted_setup_file = Some(trusted_setup_file.into()); - self - } - - /// Set the network args for the node - pub fn with_network(mut self, network: NetworkArgs) -> Self { - self.network = network; - self - } - - /// Set the rpc args for the node - pub fn with_rpc(mut self, rpc: RpcServerArgs) -> Self { - self.rpc = rpc; - self - } - - /// Set the txpool args for the node - pub fn with_txpool(mut self, txpool: TxPoolArgs) -> Self { - self.txpool = txpool; - self - } - - /// Set the builder args for the node - pub fn with_builder(mut self, builder: PayloadBuilderArgs) -> Self { - self.builder = builder; - self - } - - /// Set the debug args for the node - pub fn with_debug(mut self, debug: DebugArgs) -> Self { - self.debug = debug; - self - } - - /// Set the database args for the node - pub fn with_db(mut self, db: DatabaseArgs) -> Self { - self.db = db; - self - } - - /// Set the dev args for the node - pub fn with_dev(mut self, dev: DevArgs) -> Self { - self.dev = dev; - self - } - - /// Set the pruning args for the node - pub fn with_pruning(mut self, pruning: PruningArgs) -> Self { - self.pruning = pruning; - self - } - - /// Set the node instance number - pub fn with_instance_number(mut self, instance: u16) -> Self { - self.instance = instance; - self - } - - /// Set the rollup args for the node - #[cfg(feature = "optimism")] - pub fn with_rollup(mut self, rollup: crate::args::RollupArgs) -> Self { - self.rollup = rollup; - self - } - - /// Launches the node, also adding any RPC extensions passed. - /// - /// # Example - /// ```rust - /// # use reth_tasks::{TaskManager, TaskSpawner}; - /// # use reth::cli::{ - /// # node_builder::NodeBuilder, - /// # ext::DefaultRethNodeCommandConfig, - /// # }; - /// # use tokio::runtime::Handle; - /// - /// async fn t() { - /// let handle = Handle::current(); - /// let manager = TaskManager::new(handle); - /// let executor = manager.executor(); - /// let builder = NodeBuilder::default(); - /// let ext = DefaultRethNodeCommandConfig::default(); - /// let handle = builder.launch::<()>(ext, executor).await.unwrap(); - /// } - /// ``` - pub async fn launch( - mut self, - ext: E::Node, - executor: TaskExecutor, - ) -> eyre::Result { - let database = std::mem::take(&mut self.database); - let db_instance = - DatabaseBuilder::new(database).build_db(self.db.log_level, self.chain.chain)?; - - match db_instance { - DatabaseInstance::Real { db, data_dir } => { - let builder = NodeBuilderWithDatabase { config: self, db, data_dir }; - builder.launch::(ext, executor).await - } - DatabaseInstance::Test { db, data_dir } => { - let builder = NodeBuilderWithDatabase { config: self, db, data_dir }; - builder.launch::(ext, executor).await - } - } - } - - /// Get the network secret from the given data dir - pub fn network_secret(&self, data_dir: &ChainPath) -> eyre::Result { - let network_secret_path = - self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path()); - debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file"); - let secret_key = get_secret_key(&network_secret_path)?; - Ok(secret_key) - } - - /// Returns the initial pipeline target, based on whether or not the node is running in - /// `debug.tip` mode, `debug.continuous` mode, or neither. - /// - /// If running in `debug.tip` mode, the configured tip is returned. - /// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned. - /// Otherwise, `None` is returned. This is what the node will do by default. - pub fn initial_pipeline_target(&self, genesis_hash: B256) -> Option { - if let Some(tip) = self.debug.tip { - // Set the provided tip as the initial pipeline target. - debug!(target: "reth::cli", %tip, "Tip manually set"); - Some(tip) - } else if self.debug.continuous { - // Set genesis as the initial pipeline target. - // This will allow the downloader to start - debug!(target: "reth::cli", "Continuous sync mode enabled"); - Some(genesis_hash) - } else { - None - } - } - - /// Returns the max block that the node should run to, looking it up from the network if - /// necessary - pub async fn max_block( - &self, - network_client: &Client, - provider_factory: ProviderFactory, - ) -> eyre::Result> - where - DB: Database, - Client: HeadersClient, - { - let max_block = if let Some(block) = self.debug.max_block { - Some(block) - } else if let Some(tip) = self.debug.tip { - Some(self.lookup_or_fetch_tip(provider_factory, network_client, tip).await?) - } else { - None - }; - - Ok(max_block) - } - - /// Get the [MiningMode] from the given dev args - pub fn mining_mode(&self, pending_transactions_listener: Receiver) -> MiningMode { - if let Some(interval) = self.dev.block_time { - MiningMode::interval(interval) - } else if let Some(max_transactions) = self.dev.block_max_transactions { - MiningMode::instant(max_transactions, pending_transactions_listener) - } else { - info!(target: "reth::cli", "No mining mode specified, defaulting to ReadyTransaction"); - MiningMode::instant(1, pending_transactions_listener) - } - } - - /// Build a network and spawn it - pub async fn build_network( - &self, - config: &Config, - provider_factory: ProviderFactory, - executor: TaskExecutor, - head: Head, - data_dir: &ChainPath, - ) -> eyre::Result<(ProviderFactory, NetworkBuilder, (), ()>)> - where - DB: Database + Unpin + Clone + 'static, - { - info!(target: "reth::cli", "Connecting to P2P network"); - let network_secret_path = - self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path()); - debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file"); - let secret_key = get_secret_key(&network_secret_path)?; - let default_peers_path = data_dir.known_peers_path(); - let network_config = self.load_network_config( - config, - provider_factory, - executor.clone(), - head, - secret_key, - default_peers_path.clone(), - ); - - let client = network_config.client.clone(); - let builder = NetworkManager::builder(network_config).await?; - Ok((client, builder)) - } - - /// Build the blockchain tree - pub fn build_blockchain_tree( - &self, - provider_factory: ProviderFactory, - consensus: Arc, - prune_config: Option, - sync_metrics_tx: UnboundedSender, - tree_config: BlockchainTreeConfig, - ) -> eyre::Result> - where - DB: Database + Unpin + Clone + 'static, - { - // configure blockchain tree - let tree_externals = TreeExternals::new( - provider_factory.clone(), - consensus.clone(), - EvmProcessorFactory::new(self.chain.clone()), - ); - let tree = BlockchainTree::new( - tree_externals, - tree_config, - prune_config.clone().map(|config| config.segments), - )? - .with_sync_metrics_tx(sync_metrics_tx.clone()); - - Ok(tree) - } - - /// Build a transaction pool and spawn the transaction pool maintenance task - pub fn build_and_spawn_txpool( - &self, - blockchain_db: &BlockchainProvider, - head: Head, - executor: &TaskExecutor, - ) -> eyre::Result, InMemoryBlobStore>> - where - DB: Database + Unpin + Clone + 'static, - Tree: BlockchainTreeEngine - + BlockchainTreePendingStateProvider - + CanonStateSubscriptions - + Clone - + 'static, - { - let blob_store = InMemoryBlobStore::default(); - let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain)) - .with_head_timestamp(head.timestamp) - .kzg_settings(self.kzg_settings()?) - .with_additional_tasks(1) - .build_with_tasks(blockchain_db.clone(), executor.clone(), blob_store.clone()); - - let transaction_pool = - reth_transaction_pool::Pool::eth_pool(validator, blob_store, self.txpool.pool_config()); - info!(target: "reth::cli", "Transaction pool initialized"); - - // spawn txpool maintenance task - { - let pool = transaction_pool.clone(); - let chain_events = blockchain_db.canonical_state_stream(); - let client = blockchain_db.clone(); - executor.spawn_critical( - "txpool maintenance task", - reth_transaction_pool::maintain::maintain_transaction_pool_future( - client, - pool, - chain_events, - executor.clone(), - Default::default(), - ), - ); - debug!(target: "reth::cli", "Spawned txpool maintenance task"); - } - - Ok(transaction_pool) - } - - /// Returns the [Consensus] instance to use. - /// - /// By default this will be a [BeaconConsensus] instance, but if the `--dev` flag is set, it - /// will be an [AutoSealConsensus] instance. - pub fn consensus(&self) -> Arc { - if self.dev.dev { - Arc::new(AutoSealConsensus::new(Arc::clone(&self.chain))) - } else { - Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))) - } - } - - /// Constructs a [Pipeline] that's wired to the network - #[allow(clippy::too_many_arguments)] - async fn build_networked_pipeline( - &self, - config: &StageConfig, - client: Client, - consensus: Arc, - provider_factory: ProviderFactory, - task_executor: &TaskExecutor, - metrics_tx: reth_stages::MetricEventsSender, - prune_config: Option, - max_block: Option, - ) -> eyre::Result> - where - DB: Database + Unpin + Clone + 'static, - Client: HeadersClient + BodiesClient + Clone + 'static, - { - // building network downloaders using the fetch client - let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers) - .build(client.clone(), Arc::clone(&consensus)) - .into_task_with(task_executor); - - let body_downloader = BodiesDownloaderBuilder::new(config.bodies) - .build(client, Arc::clone(&consensus), provider_factory.clone()) - .into_task_with(task_executor); - - let pipeline = self - .build_pipeline( - provider_factory, - config, - header_downloader, - body_downloader, - consensus, - max_block, - self.debug.continuous, - metrics_tx, - prune_config, - ) - .await?; - - Ok(pipeline) - } - - /// Returns the chain specific path to the data dir. This returns `None` if the database is - /// configured for testing. - fn data_dir(&self) -> Option> { - match &self.database { - DatabaseType::Real(data_dir) => { - Some(data_dir.unwrap_or_chain_default(self.chain.chain)) - } - DatabaseType::Test => None, - } - } - - /// Returns the path to the config file. - fn config_path(&self) -> Option { - let chain_dir = self.data_dir()?; - - let config = self.config.clone().unwrap_or_else(|| chain_dir.config_path()); - Some(config) - } - - /// Loads the reth config with the given datadir root - fn load_config(&self) -> eyre::Result { - let Some(config_path) = self.config_path() else { todo!() }; - - let mut config = confy::load_path::(&config_path) - .wrap_err_with(|| format!("Could not load config file {:?}", config_path))?; - - info!(target: "reth::cli", path = ?config_path, "Configuration loaded"); - - // Update the config with the command line arguments - config.peers.connect_trusted_nodes_only = self.network.trusted_only; - - if !self.network.trusted_peers.is_empty() { - info!(target: "reth::cli", "Adding trusted nodes"); - self.network.trusted_peers.iter().for_each(|peer| { - config.peers.trusted_nodes.insert(*peer); - }); - } - - Ok(config) - } - - /// Loads the trusted setup params from a given file path or falls back to - /// `MAINNET_KZG_TRUSTED_SETUP`. - fn kzg_settings(&self) -> eyre::Result> { - if let Some(ref trusted_setup_file) = self.trusted_setup_file { - let trusted_setup = KzgSettings::load_trusted_setup_file(trusted_setup_file) - .map_err(LoadKzgSettingsError::KzgError)?; - Ok(Arc::new(trusted_setup)) - } else { - Ok(Arc::clone(&MAINNET_KZG_TRUSTED_SETUP)) - } - } - - fn install_prometheus_recorder(&self) -> eyre::Result { - Ok(PROMETHEUS_RECORDER_HANDLE.clone()) - } - - async fn start_metrics_endpoint( - &self, - prometheus_handle: PrometheusHandle, - db: Metrics, - ) -> eyre::Result<()> - where - Metrics: DatabaseMetrics + 'static + Send + Sync, - { - if let Some(listen_addr) = self.metrics { - info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); - prometheus_exporter::serve( - listen_addr, - prometheus_handle, - db, - metrics_process::Collector::default(), - ) - .await?; - } - - Ok(()) - } - - /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected - /// to that network. - fn start_network( - &self, - builder: NetworkBuilder, - task_executor: &TaskExecutor, - pool: Pool, - client: C, - data_dir: &ChainPath, - ) -> NetworkHandle - where - C: BlockReader + HeaderProvider + Clone + Unpin + 'static, - Pool: TransactionPool + Unpin + 'static, - { - let (handle, network, txpool, eth) = - builder.transactions(pool).request_handler(client).split_with_handle(); - - task_executor.spawn_critical("p2p txpool", txpool); - task_executor.spawn_critical("p2p eth request handler", eth); - - let default_peers_path = data_dir.known_peers_path(); - let known_peers_file = self.network.persistent_peers_file(default_peers_path); - task_executor.spawn_critical_with_graceful_shutdown_signal( - "p2p network task", - |shutdown| { - network.run_until_graceful_shutdown(shutdown, |network| { - write_peers_to_file(network, known_peers_file) - }) - }, - ); - - handle - } - - /// Fetches the head block from the database. - /// - /// If the database is empty, returns the genesis block. - fn lookup_head(&self, factory: ProviderFactory) -> RethResult { - let provider = factory.provider()?; - - let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number; - - let header = provider - .header_by_number(head)? - .expect("the header for the latest block is missing, database is corrupt"); - - let total_difficulty = provider - .header_td_by_number(head)? - .expect("the total difficulty for the latest block is missing, database is corrupt"); - - let hash = provider - .block_hash(head)? - .expect("the hash for the latest block is missing, database is corrupt"); - - Ok(Head { - number: head, - hash, - difficulty: header.difficulty, - total_difficulty, - timestamp: header.timestamp, - }) - } - - /// Attempt to look up the block number for the tip hash in the database. - /// If it doesn't exist, download the header and return the block number. - /// - /// NOTE: The download is attempted with infinite retries. - async fn lookup_or_fetch_tip( - &self, - provider_factory: ProviderFactory, - client: Client, - tip: B256, - ) -> RethResult - where - DB: Database, - Client: HeadersClient, - { - Ok(self.fetch_tip(provider_factory, client, BlockHashOrNumber::Hash(tip)).await?.number) - } - - /// Attempt to look up the block with the given number and return the header. - /// - /// NOTE: The download is attempted with infinite retries. - async fn fetch_tip( - &self, - factory: ProviderFactory, - client: Client, - tip: BlockHashOrNumber, - ) -> RethResult - where - DB: Database, - Client: HeadersClient, - { - let provider = factory.provider()?; - - let header = provider.header_by_hash_or_number(tip)?; - - // try to look up the header in the database - if let Some(header) = header { - info!(target: "reth::cli", ?tip, "Successfully looked up tip block in the database"); - return Ok(header.seal_slow()) - } - - info!(target: "reth::cli", ?tip, "Fetching tip block from the network."); - loop { - match get_single_header(&client, tip).await { - Ok(tip_header) => { - info!(target: "reth::cli", ?tip, "Successfully fetched tip"); - return Ok(tip_header) - } - Err(error) => { - error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying..."); - } - } - } - } - - fn load_network_config( - &self, - config: &Config, - provider_factory: ProviderFactory, - executor: TaskExecutor, - head: Head, - secret_key: SecretKey, - default_peers_path: PathBuf, - ) -> NetworkConfig> { - let cfg_builder = self - .network - .network_config(config, self.chain.clone(), secret_key, default_peers_path) - .with_task_executor(Box::new(executor)) - .set_head(head) - .listener_addr(SocketAddr::V4(SocketAddrV4::new( - self.network.addr, - // set discovery port based on instance number - self.network.port + self.instance - 1, - ))) - .discovery_addr(SocketAddr::V4(SocketAddrV4::new( - self.network.addr, - // set discovery port based on instance number - self.network.port + self.instance - 1, - ))); - - // When `sequencer_endpoint` is configured, the node will forward all transactions to a - // Sequencer node for execution and inclusion on L1, and disable its own txpool - // gossip to prevent other parties in the network from learning about them. - #[cfg(feature = "optimism")] - let cfg_builder = cfg_builder - .sequencer_endpoint(self.rollup.sequencer_http.clone()) - .disable_tx_gossip(self.rollup.disable_txpool_gossip); - - cfg_builder.build(provider_factory) - } - - #[allow(clippy::too_many_arguments)] - async fn build_pipeline( - &self, - provider_factory: ProviderFactory, - stage_config: &StageConfig, - header_downloader: H, - body_downloader: B, - consensus: Arc, - max_block: Option, - continuous: bool, - metrics_tx: reth_stages::MetricEventsSender, - prune_config: Option, - ) -> eyre::Result> - where - DB: Database + Clone + 'static, - H: HeaderDownloader + 'static, - B: BodyDownloader + 'static, - { - let mut builder = Pipeline::builder(); - - if let Some(max_block) = max_block { - debug!(target: "reth::cli", max_block, "Configuring builder to use max block"); - builder = builder.with_max_block(max_block) - } - - let (tip_tx, tip_rx) = watch::channel(B256::ZERO); - use reth_revm_inspectors::stack::InspectorStackConfig; - let factory = reth_revm::EvmProcessorFactory::new(self.chain.clone()); - - let stack_config = InspectorStackConfig { - use_printer_tracer: self.debug.print_inspector, - hook: if let Some(hook_block) = self.debug.hook_block { - Hook::Block(hook_block) - } else if let Some(tx) = self.debug.hook_transaction { - Hook::Transaction(tx) - } else if self.debug.hook_all { - Hook::All - } else { - Hook::None - }, - }; - - let factory = factory.with_stack_config(stack_config); - - let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default(); - - let header_mode = - if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; - let pipeline = builder - .with_tip_sender(tip_tx) - .with_metrics_tx(metrics_tx.clone()) - .add_stages( - DefaultStages::new( - provider_factory.clone(), - header_mode, - Arc::clone(&consensus), - header_downloader, - body_downloader, - factory.clone(), - ) - .set( - TotalDifficultyStage::new(consensus) - .with_commit_threshold(stage_config.total_difficulty.commit_threshold), - ) - .set(SenderRecoveryStage { - commit_threshold: stage_config.sender_recovery.commit_threshold, - }) - .set( - ExecutionStage::new( - factory, - ExecutionStageThresholds { - max_blocks: stage_config.execution.max_blocks, - max_changes: stage_config.execution.max_changes, - max_cumulative_gas: stage_config.execution.max_cumulative_gas, - }, - stage_config - .merkle - .clean_threshold - .max(stage_config.account_hashing.clean_threshold) - .max(stage_config.storage_hashing.clean_threshold), - prune_modes.clone(), - ) - .with_metrics_tx(metrics_tx), - ) - .set(AccountHashingStage::new( - stage_config.account_hashing.clean_threshold, - stage_config.account_hashing.commit_threshold, - )) - .set(StorageHashingStage::new( - stage_config.storage_hashing.clean_threshold, - stage_config.storage_hashing.commit_threshold, - )) - .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) - .set(TransactionLookupStage::new( - stage_config.transaction_lookup.commit_threshold, - prune_modes.transaction_lookup, - )) - .set(IndexAccountHistoryStage::new( - stage_config.index_account_history.commit_threshold, - prune_modes.account_history, - )) - .set(IndexStorageHistoryStage::new( - stage_config.index_storage_history.commit_threshold, - prune_modes.storage_history, - )), - ) - .build(provider_factory); - - Ok(pipeline) - } - - /// Change rpc port numbers based on the instance number, using the inner - /// [RpcServerArgs::adjust_instance_ports] method. - fn adjust_instance_ports(&mut self) { - self.rpc.adjust_instance_ports(self.instance); - } -} - -impl Default for NodeBuilder { - fn default() -> Self { - Self { - database: DatabaseType::default(), - config: None, - chain: MAINNET.clone(), - metrics: None, - instance: 1, - trusted_setup_file: None, - network: NetworkArgs::default(), - rpc: RpcServerArgs::default(), - txpool: TxPoolArgs::default(), - builder: PayloadBuilderArgs::default(), - debug: DebugArgs::default(), - db: DatabaseArgs::default(), - dev: DevArgs::default(), - pruning: PruningArgs::default(), - #[cfg(feature = "optimism")] - rollup: crate::args::RollupArgs::default(), - } - } -} - -/// A version of the [NodeBuilder] that has an installed database. This is used to construct the -/// [NodeHandle]. -/// -/// This also contains a path to a data dir that cannot be changed. -pub struct NodeBuilderWithDatabase { - /// The node config - pub config: NodeBuilder, - /// The database - pub db: Arc, - /// The data dir - pub data_dir: ChainPath, -} - -impl NodeBuilderWithDatabase { - /// Launch the node with the given extensions and executor - pub async fn launch( - mut self, - mut ext: E::Node, - executor: TaskExecutor, - ) -> eyre::Result { - info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); - - // Raise the fd limit of the process. - // Does not do anything on windows. - raise_fd_limit()?; - - // get config - let config = self.config.load_config()?; - - let prometheus_handle = self.config.install_prometheus_recorder()?; - info!(target: "reth::cli", "Database opened"); - - let mut provider_factory = - ProviderFactory::new(Arc::clone(&self.db), Arc::clone(&self.config.chain)); - - // configure snapshotter - let snapshotter = reth_snapshot::Snapshotter::new( - provider_factory.clone(), - self.data_dir.snapshots_path(), - self.config.chain.snapshot_block_interval, - )?; - - provider_factory = provider_factory.with_snapshots( - self.data_dir.snapshots_path(), - snapshotter.highest_snapshot_receiver(), - )?; - - self.config.start_metrics_endpoint(prometheus_handle, Arc::clone(&self.db)).await?; - - debug!(target: "reth::cli", chain=%self.config.chain.chain, genesis=?self.config.chain.genesis_hash(), "Initializing genesis"); - - let genesis_hash = init_genesis(Arc::clone(&self.db), self.config.chain.clone())?; - - info!(target: "reth::cli", "{}", DisplayHardforks::new(self.config.chain.hardforks())); - - let consensus = self.config.consensus(); - - debug!(target: "reth::cli", "Spawning stages metrics listener task"); - let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel(); - let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx); - executor.spawn_critical("stages metrics listener task", sync_metrics_listener); - - let prune_config = self - .config - .pruning - .prune_config(Arc::clone(&self.config.chain))? - .or(config.prune.clone()); - - // configure blockchain tree - let tree_config = BlockchainTreeConfig::default(); - let tree = self.config.build_blockchain_tree( - provider_factory.clone(), - consensus.clone(), - prune_config.clone(), - sync_metrics_tx.clone(), - tree_config, - )?; - let canon_state_notification_sender = tree.canon_state_notification_sender(); - let blockchain_tree = ShareableBlockchainTree::new(tree); - debug!(target: "reth::cli", "configured blockchain tree"); - - // fetch the head block from the database - let head = self - .config - .lookup_head(provider_factory.clone()) - .wrap_err("the head block is missing")?; - - // setup the blockchain provider - let blockchain_db = - BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?; - - // build transaction pool - let transaction_pool = - self.config.build_and_spawn_txpool(&blockchain_db, head, &executor)?; - - // build network - info!(target: "reth::cli", "Connecting to P2P network"); - let (network_client, mut network_builder) = self - .config - .build_network( - &config, - provider_factory.clone(), - executor.clone(), - head, - &self.data_dir, - ) - .await?; - - let components = RethNodeComponentsImpl { - provider: blockchain_db.clone(), - pool: transaction_pool.clone(), - network: network_builder.handle(), - task_executor: executor.clone(), - events: blockchain_db.clone(), - }; - - // allow network modifications - ext.configure_network(network_builder.network_mut(), &components)?; - - // launch network - let network = self.config.start_network( - network_builder, - &executor, - transaction_pool.clone(), - network_client, - &self.data_dir, - ); - - info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), enode = %network.local_node_record(), "Connected to P2P network"); - debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); - let network_client = network.fetch_client().await?; - - ext.on_components_initialized(&components)?; - - debug!(target: "reth::cli", "Spawning payload builder service"); - let payload_builder = - ext.spawn_payload_builder_service(&self.config.builder, &components)?; - - let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); - let max_block = self.config.max_block(&network_client, provider_factory.clone()).await?; - - // Configure the pipeline - let (mut pipeline, client) = if self.config.dev.dev { - info!(target: "reth::cli", "Starting Reth in dev mode"); - let mining_mode = - self.config.mining_mode(transaction_pool.pending_transactions_listener()); - - let (_, client, mut task) = AutoSealBuilder::new( - Arc::clone(&self.config.chain), - blockchain_db.clone(), - transaction_pool.clone(), - consensus_engine_tx.clone(), - canon_state_notification_sender, - mining_mode, - ) - .build(); - - let mut pipeline = self - .config - .build_networked_pipeline( - &config.stages, - client.clone(), - Arc::clone(&consensus), - provider_factory.clone(), - &executor, - sync_metrics_tx, - prune_config.clone(), - max_block, - ) - .await?; - - let pipeline_events = pipeline.events(); - task.set_pipeline_events(pipeline_events); - debug!(target: "reth::cli", "Spawning auto mine task"); - executor.spawn(Box::pin(task)); - - (pipeline, EitherDownloader::Left(client)) - } else { - let pipeline = self - .config - .build_networked_pipeline( - &config.stages, - network_client.clone(), - Arc::clone(&consensus), - provider_factory.clone(), - &executor.clone(), - sync_metrics_tx, - prune_config.clone(), - max_block, - ) - .await?; - - (pipeline, EitherDownloader::Right(network_client)) - }; - - let pipeline_events = pipeline.events(); - - let initial_target = self.config.initial_pipeline_target(genesis_hash); - let mut hooks = EngineHooks::new(); - - let pruner_events = if let Some(prune_config) = prune_config { - let mut pruner = PrunerBuilder::new(prune_config.clone()) - .max_reorg_depth(tree_config.max_reorg_depth() as usize) - .prune_delete_limit(self.config.chain.prune_delete_limit) - .build(provider_factory, snapshotter.highest_snapshot_receiver()); - - let events = pruner.events(); - hooks.add(PruneHook::new(pruner, Box::new(executor.clone()))); - - info!(target: "reth::cli", ?prune_config, "Pruner initialized"); - Either::Left(events) - } else { - Either::Right(stream::empty()) - }; - - // Configure the consensus engine - let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( - client, - pipeline, - blockchain_db.clone(), - Box::new(executor.clone()), - Box::new(network.clone()), - max_block, - self.config.debug.continuous, - payload_builder.clone(), - initial_target, - MIN_BLOCKS_FOR_PIPELINE_RUN, - consensus_engine_tx, - consensus_engine_rx, - hooks, - )?; - info!(target: "reth::cli", "Consensus engine initialized"); - - let events = stream_select!( - network.event_listener().map(Into::into), - beacon_engine_handle.event_listener().map(Into::into), - pipeline_events.map(Into::into), - if self.config.debug.tip.is_none() { - Either::Left( - ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone())) - .map(Into::into), - ) - } else { - Either::Right(stream::empty()) - }, - pruner_events.map(Into::into) - ); - executor.spawn_critical( - "events task", - events::handle_events( - Some(network.clone()), - Some(head.number), - events, - self.db.clone(), - ), - ); - - let engine_api = EngineApi::new( - blockchain_db.clone(), - self.config.chain.clone(), - beacon_engine_handle, - payload_builder.into(), - Box::new(executor.clone()), - ); - info!(target: "reth::cli", "Engine API handler initialized"); - - // extract the jwt secret from the args if possible - let default_jwt_path = self.data_dir.jwt_path(); - let jwt_secret = self.config.rpc.auth_jwt_secret(default_jwt_path)?; - - // adjust rpc port numbers based on instance number - self.config.adjust_instance_ports(); - - // Start RPC servers - let rpc_server_handles = - self.config.rpc.start_servers(&components, engine_api, jwt_secret, &mut ext).await?; - - // Run consensus engine to completion - let (tx, rx) = oneshot::channel(); - info!(target: "reth::cli", "Starting consensus engine"); - executor.spawn_critical_blocking("consensus engine", async move { - let res = beacon_consensus_engine.await; - let _ = tx.send(res); - }); - - ext.on_node_started(&components)?; - - // If `enable_genesis_walkback` is set to true, the rollup client will need to - // perform the derivation pipeline from genesis, validating the data dir. - // When set to false, set the finalized, safe, and unsafe head block hashes - // on the rollup client using a fork choice update. This prevents the rollup - // client from performing the derivation pipeline from genesis, and instead - // starts syncing from the current tip in the DB. - #[cfg(feature = "optimism")] - if self.config.chain.is_optimism() && !self.config.rollup.enable_genesis_walkback { - let client = rpc_server_handles.auth.http_client(); - reth_rpc_api::EngineApiClient::fork_choice_updated_v2( - &client, - reth_rpc_types::engine::ForkchoiceState { - head_block_hash: head.hash, - safe_block_hash: head.hash, - finalized_block_hash: head.hash, - }, - None, - ) - .await?; - } - - // construct node handle and return - let node_handle = NodeHandle { - rpc_server_handles, - consensus_engine_rx: rx, - terminate: self.config.debug.terminate, - }; - Ok(node_handle) - } -} - -/// The [NodeHandle] contains the [RethRpcServerHandles] returned by the reth initialization -/// process, as well as a method for waiting for the node exit. -#[derive(Debug)] -pub struct NodeHandle { - /// The handles to the RPC servers - rpc_server_handles: RethRpcServerHandles, - - /// The receiver half of the channel for the consensus engine. - /// This can be used to wait for the consensus engine to exit. - consensus_engine_rx: oneshot::Receiver>, - - /// Flag indicating whether the node should be terminated after the pipeline sync. - terminate: bool, -} - -impl NodeHandle { - /// Returns the [RethRpcServerHandles] for this node. - pub fn rpc_server_handles(&self) -> &RethRpcServerHandles { - &self.rpc_server_handles - } - - /// Waits for the node to exit, if it was configured to exit. - pub async fn wait_for_node_exit(self) -> eyre::Result<()> { - self.consensus_engine_rx.await??; - info!(target: "reth::cli", "Consensus engine has exited."); - - if self.terminate { - Ok(()) - } else { - // The pipeline has finished downloading blocks up to `--debug.tip` or - // `--debug.max-block`. Keep other node components alive for further usage. - futures::future::pending().await - } - } -} - -/// A simple function to launch a node with the specified [NodeBuilder], spawning tasks on the -/// [TaskExecutor] constructed from [Handle::current]. -pub async fn spawn_node(config: NodeBuilder) -> eyre::Result { - let handle = Handle::current(); - let task_manager = TaskManager::new(handle); - let ext = DefaultRethNodeCommandConfig; - config.launch::<()>(ext, task_manager.executor()).await -} - -#[cfg(test)] -mod tests { - use super::*; - use reth_primitives::U256; - use reth_rpc_api::EthApiClient; - - #[tokio::test] - async fn block_number_node_config_test() { - // this launches a test node with http - let rpc_args = RpcServerArgs::default().with_http(); - - // NOTE: tests here manually set an instance number. The alternative would be to use an - // atomic counter. This works for `cargo test` but if tests would be run in `nextest` then - // they would become flaky. So new tests should manually set a unique instance number. - let handle = - spawn_node(NodeBuilder::test().with_rpc(rpc_args).with_instance(1)).await.unwrap(); - - // call a function on the node - let client = handle.rpc_server_handles().rpc.http_client().unwrap(); - let block_number = client.block_number().await.unwrap(); - - // it should be zero, since this is an ephemeral test node - assert_eq!(block_number, U256::ZERO); - } - - #[tokio::test] - async fn rpc_handles_none_without_http() { - // this launches a test node _without_ http - let handle = spawn_node(NodeBuilder::test().with_instance(2)).await.unwrap(); - - // ensure that the `http_client` is none - let maybe_client = handle.rpc_server_handles().rpc.http_client(); - assert!(maybe_client.is_none()); - } - - #[tokio::test] - async fn launch_multiple_nodes() { - // spawn_test_node takes roughly 1 second per node, so this test takes ~4 seconds - let num_nodes = 4; - - let starting_instance = 3; - let mut handles = Vec::new(); - for i in 0..num_nodes { - let handle = - spawn_node(NodeBuilder::test().with_instance(starting_instance + i)).await.unwrap(); - handles.push(handle); - } - } -} diff --git a/bin/reth/src/dirs.rs b/bin/reth/src/dirs.rs index 7bcbd2ce3..064b18e7f 100644 --- a/bin/reth/src/dirs.rs +++ b/bin/reth/src/dirs.rs @@ -190,11 +190,6 @@ impl MaybePlatformPath { ) } - /// Returns the default platform path for the specified [Chain]. - pub fn chain_default(chain: Chain) -> ChainPath { - PlatformPath::default().with_chain(chain) - } - /// Returns true if a custom path is set pub fn is_some(&self) -> bool { self.0.is_some() diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 664da40e1..7cecca9f2 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -3,21 +3,90 @@ //! Starts the client use crate::{ args::{ + get_secret_key, utils::{chain_help, genesis_value_parser, parse_socket_address, SUPPORTED_CHAINS}, DatabaseArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, TxPoolArgs, }, - cli::{db_type::DatabaseType, ext::RethCliExt, node_builder::NodeBuilder}, - dirs::{DataDirPath, MaybePlatformPath}, + cli::{ + components::RethNodeComponentsImpl, + config::{RethRpcConfig, RethTransactionPoolConfig}, + ext::{RethCliExt, RethNodeCommandConfig}, + }, + dirs::{ChainPath, DataDirPath, MaybePlatformPath}, + init::init_genesis, + node::cl_events::ConsensusLayerHealthEvents, + prometheus_exporter, runner::CliContext, + utils::get_single_header, version::SHORT_VERSION, }; use clap::{value_parser, Parser}; -use reth_auto_seal_consensus::AutoSealConsensus; -use reth_beacon_consensus::BeaconConsensus; -use reth_interfaces::consensus::Consensus; -use reth_primitives::ChainSpec; -use std::{net::SocketAddr, path::PathBuf, sync::Arc}; +use eyre::Context; +use futures::{future::Either, pin_mut, stream, stream_select, StreamExt}; +use metrics_exporter_prometheus::PrometheusHandle; +use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode}; +use reth_beacon_consensus::{ + hooks::{EngineHooks, PruneHook}, + BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN, +}; +use reth_blockchain_tree::{ + config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, +}; +use reth_config::{ + config::{PruneConfig, StageConfig}, + Config, +}; +use reth_db::{database::Database, database_metrics::DatabaseMetrics, init_db}; +use reth_downloaders::{ + bodies::bodies::BodiesDownloaderBuilder, + headers::reverse_headers::ReverseHeadersDownloaderBuilder, +}; +use reth_interfaces::{ + consensus::Consensus, + p2p::{ + bodies::{client::BodiesClient, downloader::BodyDownloader}, + either::EitherDownloader, + headers::{client::HeadersClient, downloader::HeaderDownloader}, + }, + RethResult, +}; +use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager}; +use reth_network_api::{NetworkInfo, PeersInfo}; +use reth_primitives::{ + constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP}, + fs, + kzg::KzgSettings, + stage::StageId, + BlockHashOrNumber, BlockNumber, ChainSpec, DisplayHardforks, Head, SealedHeader, B256, +}; +use reth_provider::{ + providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions, + HeaderProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader, +}; +use reth_prune::PrunerBuilder; +use reth_revm::EvmProcessorFactory; +use reth_revm_inspectors::stack::Hook; +use reth_rpc_engine_api::EngineApi; +use reth_stages::{ + prelude::*, + stages::{ + AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, + IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, + TotalDifficultyStage, TransactionLookupStage, + }, +}; +use reth_tasks::TaskExecutor; +use reth_transaction_pool::{ + blobstore::InMemoryBlobStore, TransactionPool, TransactionValidationTaskExecutor, +}; +use secp256k1::SecretKey; +use std::{ + net::{SocketAddr, SocketAddrV4}, + path::PathBuf, + sync::Arc, +}; +use tokio::sync::{mpsc::unbounded_channel, oneshot, watch}; use tracing::*; pub mod cl_events; @@ -167,59 +236,357 @@ impl NodeCommand { } /// Execute `node` command - pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> { + pub async fn execute(mut self, ctx: CliContext) -> eyre::Result<()> { info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); - let Self { - datadir, - config, - chain, - metrics, - trusted_setup_file, - instance, - network, - rpc, - txpool, - builder, - debug, - db, - dev, - pruning, - #[cfg(feature = "optimism")] - rollup, - ext, - } = self; + // Raise the fd limit of the process. + // Does not do anything on windows. + let _ = fdlimit::raise_fd_limit(); - // set up real database - let database = DatabaseType::Real(datadir); + // get config + let config = self.load_config()?; - // set up node config - let node_config = NodeBuilder { - database, - config, - chain, - metrics, - instance, - trusted_setup_file, - network, - rpc, - txpool, - builder, - debug, - db, - dev, - pruning, - #[cfg(feature = "optimism")] - rollup, + let prometheus_handle = self.install_prometheus_recorder()?; + + let data_dir = self.data_dir(); + let db_path = data_dir.db_path(); + + info!(target: "reth::cli", path = ?db_path, "Opening database"); + let db = Arc::new(init_db(&db_path, self.db.log_level)?.with_metrics()); + info!(target: "reth::cli", "Database opened"); + + let mut provider_factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)); + + // configure snapshotter + let snapshotter = reth_snapshot::Snapshotter::new( + provider_factory.clone(), + data_dir.snapshots_path(), + self.chain.snapshot_block_interval, + )?; + + provider_factory = provider_factory + .with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver())?; + + self.start_metrics_endpoint(prometheus_handle, Arc::clone(&db)).await?; + + debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); + + let genesis_hash = init_genesis(Arc::clone(&db), self.chain.clone())?; + + info!(target: "reth::cli", "{}", DisplayHardforks::new(self.chain.hardforks())); + + let consensus = self.consensus(); + + debug!(target: "reth::cli", "Spawning stages metrics listener task"); + let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel(); + let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx); + ctx.task_executor.spawn_critical("stages metrics listener task", sync_metrics_listener); + + let prune_config = + self.pruning.prune_config(Arc::clone(&self.chain))?.or(config.prune.clone()); + + // configure blockchain tree + let tree_externals = TreeExternals::new( + provider_factory.clone(), + Arc::clone(&consensus), + EvmProcessorFactory::new(self.chain.clone()), + ); + let tree_config = BlockchainTreeConfig::default(); + let tree = BlockchainTree::new( + tree_externals, + tree_config, + prune_config.clone().map(|config| config.segments), + )? + .with_sync_metrics_tx(sync_metrics_tx.clone()); + let canon_state_notification_sender = tree.canon_state_notification_sender(); + let blockchain_tree = ShareableBlockchainTree::new(tree); + debug!(target: "reth::cli", "configured blockchain tree"); + + // fetch the head block from the database + let head = + self.lookup_head(provider_factory.clone()).wrap_err("the head block is missing")?; + + // setup the blockchain provider + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?; + let blob_store = InMemoryBlobStore::default(); + let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain)) + .with_head_timestamp(head.timestamp) + .kzg_settings(self.kzg_settings()?) + .with_additional_tasks(1) + .build_with_tasks(blockchain_db.clone(), ctx.task_executor.clone(), blob_store.clone()); + + let transaction_pool = + reth_transaction_pool::Pool::eth_pool(validator, blob_store, self.txpool.pool_config()); + info!(target: "reth::cli", "Transaction pool initialized"); + + // spawn txpool maintenance task + { + let pool = transaction_pool.clone(); + let chain_events = blockchain_db.canonical_state_stream(); + let client = blockchain_db.clone(); + ctx.task_executor.spawn_critical( + "txpool maintenance task", + reth_transaction_pool::maintain::maintain_transaction_pool_future( + client, + pool, + chain_events, + ctx.task_executor.clone(), + Default::default(), + ), + ); + debug!(target: "reth::cli", "Spawned txpool maintenance task"); + } + + info!(target: "reth::cli", "Connecting to P2P network"); + let network_secret_path = + self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path()); + debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file"); + let secret_key = get_secret_key(&network_secret_path)?; + let default_peers_path = data_dir.known_peers_path(); + let network_config = self.load_network_config( + &config, + provider_factory.clone(), + ctx.task_executor.clone(), + head, + secret_key, + default_peers_path.clone(), + ); + + let network_client = network_config.client.clone(); + let mut network_builder = NetworkManager::builder(network_config).await?; + + let components = RethNodeComponentsImpl { + provider: blockchain_db.clone(), + pool: transaction_pool.clone(), + network: network_builder.handle(), + task_executor: ctx.task_executor.clone(), + events: blockchain_db.clone(), }; - let executor = ctx.task_executor; + // allow network modifications + self.ext.configure_network(network_builder.network_mut(), &components)?; - // launch the node - let handle = node_config.launch::(ext, executor).await?; + // launch network + let network = self.start_network( + network_builder, + &ctx.task_executor, + transaction_pool.clone(), + network_client, + default_peers_path, + ); - // wait for node exit - handle.wait_for_node_exit().await + info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), enode = %network.local_node_record(), "Connected to P2P network"); + debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); + let network_client = network.fetch_client().await?; + + self.ext.on_components_initialized(&components)?; + + debug!(target: "reth::cli", "Spawning payload builder service"); + let payload_builder = self.ext.spawn_payload_builder_service(&self.builder, &components)?; + + let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + let max_block = if let Some(block) = self.debug.max_block { + Some(block) + } else if let Some(tip) = self.debug.tip { + Some(self.lookup_or_fetch_tip(provider_factory.clone(), &network_client, tip).await?) + } else { + None + }; + + // Configure the pipeline + let (mut pipeline, client) = if self.dev.dev { + info!(target: "reth::cli", "Starting Reth in dev mode"); + + let mining_mode = if let Some(interval) = self.dev.block_time { + MiningMode::interval(interval) + } else if let Some(max_transactions) = self.dev.block_max_transactions { + MiningMode::instant( + max_transactions, + transaction_pool.pending_transactions_listener(), + ) + } else { + info!(target: "reth::cli", "No mining mode specified, defaulting to ReadyTransaction"); + MiningMode::instant(1, transaction_pool.pending_transactions_listener()) + }; + + let (_, client, mut task) = AutoSealBuilder::new( + Arc::clone(&self.chain), + blockchain_db.clone(), + transaction_pool.clone(), + consensus_engine_tx.clone(), + canon_state_notification_sender, + mining_mode, + ) + .build(); + + let mut pipeline = self + .build_networked_pipeline( + &config.stages, + client.clone(), + Arc::clone(&consensus), + provider_factory.clone(), + &ctx.task_executor, + sync_metrics_tx, + prune_config.clone(), + max_block, + ) + .await?; + + let pipeline_events = pipeline.events(); + task.set_pipeline_events(pipeline_events); + debug!(target: "reth::cli", "Spawning auto mine task"); + ctx.task_executor.spawn(Box::pin(task)); + + (pipeline, EitherDownloader::Left(client)) + } else { + let pipeline = self + .build_networked_pipeline( + &config.stages, + network_client.clone(), + Arc::clone(&consensus), + provider_factory.clone(), + &ctx.task_executor, + sync_metrics_tx, + prune_config.clone(), + max_block, + ) + .await?; + + (pipeline, EitherDownloader::Right(network_client)) + }; + + let pipeline_events = pipeline.events(); + + let initial_target = if let Some(tip) = self.debug.tip { + // Set the provided tip as the initial pipeline target. + debug!(target: "reth::cli", %tip, "Tip manually set"); + Some(tip) + } else if self.debug.continuous { + // Set genesis as the initial pipeline target. + // This will allow the downloader to start + debug!(target: "reth::cli", "Continuous sync mode enabled"); + Some(genesis_hash) + } else { + None + }; + + let mut hooks = EngineHooks::new(); + + let pruner_events = if let Some(prune_config) = prune_config { + let mut pruner = PrunerBuilder::new(prune_config.clone()) + .max_reorg_depth(tree_config.max_reorg_depth() as usize) + .prune_delete_limit(self.chain.prune_delete_limit) + .build(provider_factory, snapshotter.highest_snapshot_receiver()); + + let events = pruner.events(); + hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone()))); + + info!(target: "reth::cli", ?prune_config, "Pruner initialized"); + Either::Left(events) + } else { + Either::Right(stream::empty()) + }; + + // Configure the consensus engine + let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( + client, + pipeline, + blockchain_db.clone(), + Box::new(ctx.task_executor.clone()), + Box::new(network.clone()), + max_block, + self.debug.continuous, + payload_builder.clone(), + initial_target, + MIN_BLOCKS_FOR_PIPELINE_RUN, + consensus_engine_tx, + consensus_engine_rx, + hooks, + )?; + info!(target: "reth::cli", "Consensus engine initialized"); + + let events = stream_select!( + network.event_listener().map(Into::into), + beacon_engine_handle.event_listener().map(Into::into), + pipeline_events.map(Into::into), + if self.debug.tip.is_none() { + Either::Left( + ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone())) + .map(Into::into), + ) + } else { + Either::Right(stream::empty()) + }, + pruner_events.map(Into::into) + ); + ctx.task_executor.spawn_critical( + "events task", + events::handle_events(Some(network.clone()), Some(head.number), events, db.clone()), + ); + + let engine_api = EngineApi::new( + blockchain_db.clone(), + self.chain.clone(), + beacon_engine_handle, + payload_builder.into(), + Box::new(ctx.task_executor.clone()), + ); + info!(target: "reth::cli", "Engine API handler initialized"); + + // extract the jwt secret from the args if possible + let default_jwt_path = data_dir.jwt_path(); + let jwt_secret = self.rpc.auth_jwt_secret(default_jwt_path)?; + + // adjust rpc port numbers based on instance number + self.adjust_instance_ports(); + + // Start RPC servers + let _rpc_server_handles = + self.rpc.start_servers(&components, engine_api, jwt_secret, &mut self.ext).await?; + + // Run consensus engine to completion + let (tx, rx) = oneshot::channel(); + info!(target: "reth::cli", "Starting consensus engine"); + ctx.task_executor.spawn_critical_blocking("consensus engine", async move { + let res = beacon_consensus_engine.await; + let _ = tx.send(res); + }); + + self.ext.on_node_started(&components)?; + + // If `enable_genesis_walkback` is set to true, the rollup client will need to + // perform the derivation pipeline from genesis, validating the data dir. + // When set to false, set the finalized, safe, and unsafe head block hashes + // on the rollup client using a fork choice update. This prevents the rollup + // client from performing the derivation pipeline from genesis, and instead + // starts syncing from the current tip in the DB. + #[cfg(feature = "optimism")] + if self.chain.is_optimism() && !self.rollup.enable_genesis_walkback { + let client = _rpc_server_handles.auth.http_client(); + reth_rpc_api::EngineApiClient::fork_choice_updated_v2( + &client, + reth_rpc_types::engine::ForkchoiceState { + head_block_hash: head.hash, + safe_block_hash: head.hash, + finalized_block_hash: head.hash, + }, + None, + ) + .await?; + } + + rx.await??; + + info!(target: "reth::cli", "Consensus engine has exited."); + + if self.debug.terminate { + Ok(()) + } else { + // The pipeline has finished downloading blocks up to `--debug.tip` or + // `--debug.max-block`. Keep other node components alive for further usage. + futures::future::pending().await + } } /// Returns the [Consensus] instance to use. @@ -233,6 +600,425 @@ impl NodeCommand { Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))) } } + + /// Constructs a [Pipeline] that's wired to the network + #[allow(clippy::too_many_arguments)] + async fn build_networked_pipeline( + &self, + config: &StageConfig, + client: Client, + consensus: Arc, + provider_factory: ProviderFactory, + task_executor: &TaskExecutor, + metrics_tx: reth_stages::MetricEventsSender, + prune_config: Option, + max_block: Option, + ) -> eyre::Result> + where + DB: Database + Unpin + Clone + 'static, + Client: HeadersClient + BodiesClient + Clone + 'static, + { + // building network downloaders using the fetch client + let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers) + .build(client.clone(), Arc::clone(&consensus)) + .into_task_with(task_executor); + + let body_downloader = BodiesDownloaderBuilder::new(config.bodies) + .build(client, Arc::clone(&consensus), provider_factory.clone()) + .into_task_with(task_executor); + + let pipeline = self + .build_pipeline( + provider_factory, + config, + header_downloader, + body_downloader, + consensus, + max_block, + self.debug.continuous, + metrics_tx, + prune_config, + ) + .await?; + + Ok(pipeline) + } + + /// Returns the chain specific path to the data dir. + fn data_dir(&self) -> ChainPath { + self.datadir.unwrap_or_chain_default(self.chain.chain) + } + + /// Returns the path to the config file. + fn config_path(&self) -> PathBuf { + self.config.clone().unwrap_or_else(|| self.data_dir().config_path()) + } + + /// Loads the reth config with the given datadir root + fn load_config(&self) -> eyre::Result { + let config_path = self.config_path(); + let mut config = confy::load_path::(&config_path) + .wrap_err_with(|| format!("Could not load config file {:?}", config_path))?; + + info!(target: "reth::cli", path = ?config_path, "Configuration loaded"); + + // Update the config with the command line arguments + config.peers.connect_trusted_nodes_only = self.network.trusted_only; + + if !self.network.trusted_peers.is_empty() { + info!(target: "reth::cli", "Adding trusted nodes"); + self.network.trusted_peers.iter().for_each(|peer| { + config.peers.trusted_nodes.insert(*peer); + }); + } + + Ok(config) + } + + /// Loads the trusted setup params from a given file path or falls back to + /// `MAINNET_KZG_TRUSTED_SETUP`. + fn kzg_settings(&self) -> eyre::Result> { + if let Some(ref trusted_setup_file) = self.trusted_setup_file { + let trusted_setup = KzgSettings::load_trusted_setup_file(trusted_setup_file) + .map_err(LoadKzgSettingsError::KzgError)?; + Ok(Arc::new(trusted_setup)) + } else { + Ok(Arc::clone(&MAINNET_KZG_TRUSTED_SETUP)) + } + } + + fn install_prometheus_recorder(&self) -> eyre::Result { + prometheus_exporter::install_recorder() + } + + async fn start_metrics_endpoint( + &self, + prometheus_handle: PrometheusHandle, + db: Metrics, + ) -> eyre::Result<()> + where + Metrics: DatabaseMetrics + 'static + Send + Sync, + { + if let Some(listen_addr) = self.metrics { + info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); + prometheus_exporter::serve( + listen_addr, + prometheus_handle, + db, + metrics_process::Collector::default(), + ) + .await?; + } + + Ok(()) + } + + /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected + /// to that network. + fn start_network( + &self, + builder: NetworkBuilder, + task_executor: &TaskExecutor, + pool: Pool, + client: C, + default_peers_path: PathBuf, + ) -> NetworkHandle + where + C: BlockReader + HeaderProvider + Clone + Unpin + 'static, + Pool: TransactionPool + Unpin + 'static, + { + let (handle, network, txpool, eth) = + builder.transactions(pool).request_handler(client).split_with_handle(); + + task_executor.spawn_critical("p2p txpool", txpool); + task_executor.spawn_critical("p2p eth request handler", eth); + + let known_peers_file = self.network.persistent_peers_file(default_peers_path); + task_executor + .spawn_critical_with_graceful_shutdown_signal("p2p network task", |shutdown| { + run_network_until_shutdown(shutdown, network, known_peers_file) + }); + + handle + } + + /// Fetches the head block from the database. + /// + /// If the database is empty, returns the genesis block. + fn lookup_head(&self, factory: ProviderFactory) -> RethResult { + let provider = factory.provider()?; + + let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number; + + let header = provider + .header_by_number(head)? + .expect("the header for the latest block is missing, database is corrupt"); + + let total_difficulty = provider + .header_td_by_number(head)? + .expect("the total difficulty for the latest block is missing, database is corrupt"); + + let hash = provider + .block_hash(head)? + .expect("the hash for the latest block is missing, database is corrupt"); + + Ok(Head { + number: head, + hash, + difficulty: header.difficulty, + total_difficulty, + timestamp: header.timestamp, + }) + } + + /// Attempt to look up the block number for the tip hash in the database. + /// If it doesn't exist, download the header and return the block number. + /// + /// NOTE: The download is attempted with infinite retries. + async fn lookup_or_fetch_tip( + &self, + provider_factory: ProviderFactory, + client: Client, + tip: B256, + ) -> RethResult + where + DB: Database, + Client: HeadersClient, + { + Ok(self.fetch_tip(provider_factory, client, BlockHashOrNumber::Hash(tip)).await?.number) + } + + /// Attempt to look up the block with the given number and return the header. + /// + /// NOTE: The download is attempted with infinite retries. + async fn fetch_tip( + &self, + factory: ProviderFactory, + client: Client, + tip: BlockHashOrNumber, + ) -> RethResult + where + DB: Database, + Client: HeadersClient, + { + let provider = factory.provider()?; + + let header = provider.header_by_hash_or_number(tip)?; + + // try to look up the header in the database + if let Some(header) = header { + info!(target: "reth::cli", ?tip, "Successfully looked up tip block in the database"); + return Ok(header.seal_slow()) + } + + info!(target: "reth::cli", ?tip, "Fetching tip block from the network."); + loop { + match get_single_header(&client, tip).await { + Ok(tip_header) => { + info!(target: "reth::cli", ?tip, "Successfully fetched tip"); + return Ok(tip_header) + } + Err(error) => { + error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying..."); + } + } + } + } + + fn load_network_config( + &self, + config: &Config, + provider_factory: ProviderFactory, + executor: TaskExecutor, + head: Head, + secret_key: SecretKey, + default_peers_path: PathBuf, + ) -> NetworkConfig> { + let cfg_builder = self + .network + .network_config(config, self.chain.clone(), secret_key, default_peers_path) + .with_task_executor(Box::new(executor)) + .set_head(head) + .listener_addr(SocketAddr::V4(SocketAddrV4::new( + self.network.addr, + // set discovery port based on instance number + self.network.port + self.instance - 1, + ))) + .discovery_addr(SocketAddr::V4(SocketAddrV4::new( + self.network.addr, + // set discovery port based on instance number + self.network.port + self.instance - 1, + ))); + + // When `sequencer_endpoint` is configured, the node will forward all transactions to a + // Sequencer node for execution and inclusion on L1, and disable its own txpool + // gossip to prevent other parties in the network from learning about them. + #[cfg(feature = "optimism")] + let cfg_builder = cfg_builder + .sequencer_endpoint(self.rollup.sequencer_http.clone()) + .disable_tx_gossip(self.rollup.disable_txpool_gossip); + + cfg_builder.build(provider_factory) + } + + #[allow(clippy::too_many_arguments)] + async fn build_pipeline( + &self, + provider_factory: ProviderFactory, + config: &StageConfig, + header_downloader: H, + body_downloader: B, + consensus: Arc, + max_block: Option, + continuous: bool, + metrics_tx: reth_stages::MetricEventsSender, + prune_config: Option, + ) -> eyre::Result> + where + DB: Database + Clone + 'static, + H: HeaderDownloader + 'static, + B: BodyDownloader + 'static, + { + let mut builder = Pipeline::builder(); + + if let Some(max_block) = max_block { + debug!(target: "reth::cli", max_block, "Configuring builder to use max block"); + builder = builder.with_max_block(max_block) + } + + let (tip_tx, tip_rx) = watch::channel(B256::ZERO); + use reth_revm_inspectors::stack::InspectorStackConfig; + let factory = reth_revm::EvmProcessorFactory::new(self.chain.clone()); + + let stack_config = InspectorStackConfig { + use_printer_tracer: self.debug.print_inspector, + hook: if let Some(hook_block) = self.debug.hook_block { + Hook::Block(hook_block) + } else if let Some(tx) = self.debug.hook_transaction { + Hook::Transaction(tx) + } else if self.debug.hook_all { + Hook::All + } else { + Hook::None + }, + }; + + let factory = factory.with_stack_config(stack_config); + + let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default(); + + let header_mode = + if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; + let pipeline = builder + .with_tip_sender(tip_tx) + .with_metrics_tx(metrics_tx.clone()) + .add_stages( + DefaultStages::new( + provider_factory.clone(), + header_mode, + Arc::clone(&consensus), + header_downloader, + body_downloader, + factory.clone(), + ) + .set( + TotalDifficultyStage::new(consensus) + .with_commit_threshold(config.total_difficulty.commit_threshold), + ) + .set(SenderRecoveryStage { + commit_threshold: config.sender_recovery.commit_threshold, + }) + .set( + ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: config.execution.max_blocks, + max_changes: config.execution.max_changes, + max_cumulative_gas: config.execution.max_cumulative_gas, + }, + config + .merkle + .clean_threshold + .max(config.account_hashing.clean_threshold) + .max(config.storage_hashing.clean_threshold), + prune_modes.clone(), + ) + .with_metrics_tx(metrics_tx), + ) + .set(AccountHashingStage::new( + config.account_hashing.clean_threshold, + config.account_hashing.commit_threshold, + )) + .set(StorageHashingStage::new( + config.storage_hashing.clean_threshold, + config.storage_hashing.commit_threshold, + )) + .set(MerkleStage::new_execution(config.merkle.clean_threshold)) + .set(TransactionLookupStage::new( + config.transaction_lookup.commit_threshold, + prune_modes.transaction_lookup, + )) + .set(IndexAccountHistoryStage::new( + config.index_account_history.commit_threshold, + prune_modes.account_history, + )) + .set(IndexStorageHistoryStage::new( + config.index_storage_history.commit_threshold, + prune_modes.storage_history, + )), + ) + .build(provider_factory); + + Ok(pipeline) + } + + /// Change rpc port numbers based on the instance number. + fn adjust_instance_ports(&mut self) { + // auth port is scaled by a factor of instance * 100 + self.rpc.auth_port += self.instance * 100 - 100; + // http port is scaled by a factor of -instance + self.rpc.http_port -= self.instance - 1; + // ws port is scaled by a factor of instance * 2 + self.rpc.ws_port += self.instance * 2 - 2; + } +} + +/// Drives the [NetworkManager] future until a [Shutdown](reth_tasks::shutdown::Shutdown) signal is +/// received. If configured, this writes known peers to `persistent_peers_file` afterwards. +async fn run_network_until_shutdown( + shutdown: reth_tasks::shutdown::GracefulShutdown, + network: NetworkManager, + persistent_peers_file: Option, +) where + C: BlockReader + HeaderProvider + Clone + Unpin + 'static, +{ + pin_mut!(network, shutdown); + + let mut graceful_guard = None; + tokio::select! { + _ = &mut network => {}, + guard = shutdown => { + graceful_guard = Some(guard); + }, + } + + if let Some(file_path) = persistent_peers_file { + let known_peers = network.all_peers().collect::>(); + if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) { + trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers"); + let parent_dir = file_path.parent().map(fs::create_dir_all).transpose(); + match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) { + Ok(_) => { + info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file"); + } + Err(err) => { + warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file"); + } + } + } + } + + drop(graceful_guard) } #[cfg(test)] @@ -363,7 +1149,7 @@ mod tests { #[test] fn parse_instance() { let mut cmd = NodeCommand::<()>::parse_from(["reth"]); - cmd.rpc.adjust_instance_ports(cmd.instance); + cmd.adjust_instance_ports(); cmd.network.port = DEFAULT_DISCOVERY_PORT + cmd.instance - 1; // check rpc port numbers assert_eq!(cmd.rpc.auth_port, 8551); @@ -373,7 +1159,7 @@ mod tests { assert_eq!(cmd.network.port, 30303); let mut cmd = NodeCommand::<()>::parse_from(["reth", "--instance", "2"]); - cmd.rpc.adjust_instance_ports(cmd.instance); + cmd.adjust_instance_ports(); cmd.network.port = DEFAULT_DISCOVERY_PORT + cmd.instance - 1; // check rpc port numbers assert_eq!(cmd.rpc.auth_port, 8651); @@ -383,7 +1169,7 @@ mod tests { assert_eq!(cmd.network.port, 30304); let mut cmd = NodeCommand::<()>::parse_from(["reth", "--instance", "3"]); - cmd.rpc.adjust_instance_ports(cmd.instance); + cmd.adjust_instance_ports(); cmd.network.port = DEFAULT_DISCOVERY_PORT + cmd.instance - 1; // check rpc port numbers assert_eq!(cmd.rpc.auth_port, 8751); diff --git a/bin/reth/src/utils.rs b/bin/reth/src/utils.rs index 38a824fb9..74af5288c 100644 --- a/bin/reth/src/utils.rs +++ b/bin/reth/src/utils.rs @@ -15,11 +15,9 @@ use reth_interfaces::p2p::{ headers::client::{HeadersClient, HeadersRequest}, priority::Priority, }; -use reth_network::NetworkManager; use reth_primitives::{ fs, BlockHashOrNumber, ChainSpec, HeadersDirection, SealedBlock, SealedHeader, }; -use reth_provider::BlockReader; use reth_rpc::{JwtError, JwtSecret}; use std::{ env::VarError, @@ -27,7 +25,7 @@ use std::{ rc::Rc, sync::Arc, }; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info}; /// Exposing `open_db_read_only` function pub mod db { @@ -257,8 +255,8 @@ impl ListFilter { self.len = len; } } - /// Attempts to retrieve or create a JWT secret from the specified path. + pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result { if path.exists() { debug!(target: "reth::cli", ?path, "Reading JWT auth secret file"); @@ -268,26 +266,3 @@ pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result(network: &NetworkManager, persistent_peers_file: Option) -where - C: BlockReader + Unpin, -{ - if let Some(file_path) = persistent_peers_file { - let known_peers = network.all_peers().collect::>(); - if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) { - trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers"); - let parent_dir = file_path.parent().map(fs::create_dir_all).transpose(); - match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) { - Ok(_) => { - info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file"); - } - Err(err) => { - warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file"); - } - } - } - } -} diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 456b542e3..88cb8bdcb 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -33,7 +33,7 @@ use crate::{ transactions::NetworkTransactionEvent, FetchClient, NetworkBuilder, }; -use futures::{pin_mut, Future, StreamExt}; +use futures::{Future, StreamExt}; use parking_lot::Mutex; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, @@ -45,7 +45,6 @@ use reth_network_api::ReputationChangeKind; use reth_primitives::{ForkId, NodeRecord, PeerId, B256}; use reth_provider::{BlockNumReader, BlockReader}; use reth_rpc_types::{EthProtocolInfo, NetworkStatus}; -use reth_tasks::shutdown::GracefulShutdown; use reth_tokio_util::EventListeners; use secp256k1::SecretKey; use std::{ @@ -597,34 +596,6 @@ where } } -impl NetworkManager -where - C: BlockReader + Unpin, -{ - /// Drives the [NetworkManager] future until a [GracefulShutdown] signal is received. - /// - /// This also run the given function `shutdown_hook` afterwards. - pub async fn run_until_graceful_shutdown( - self, - shutdown: GracefulShutdown, - shutdown_hook: impl FnOnce(&mut Self), - ) { - let network = self; - pin_mut!(network, shutdown); - - let mut graceful_guard = None; - tokio::select! { - _ = &mut network => {}, - guard = shutdown => { - graceful_guard = Some(guard); - }, - } - - shutdown_hook(&mut network); - drop(graceful_guard); - } -} - impl Future for NetworkManager where C: BlockReader + Unpin, diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index a57779d4f..16ef51b4b 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -226,15 +226,9 @@ pub mod test_utils { } } - /// Get a temporary directory path to use for the database - pub fn tempdir_path() -> PathBuf { - let builder = tempfile::Builder::new().prefix("reth-test-").rand_bytes(8).tempdir(); - builder.expect(ERROR_TEMPDIR).into_path() - } - /// Create read/write database for testing pub fn create_test_rw_db() -> Arc> { - let path = tempdir_path(); + let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(); let emsg = format!("{}: {:?}", ERROR_DB_CREATION, path); let db = init_db(&path, None).expect(&emsg); @@ -251,7 +245,7 @@ pub mod test_utils { /// Create read only database for testing pub fn create_test_ro_db() -> Arc> { - let path = tempdir_path(); + let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(); { init_db(path.as_path(), None).expect(ERROR_DB_CREATION); } diff --git a/crates/tracing/src/lib.rs b/crates/tracing/src/lib.rs index 2fc551a5f..61481013f 100644 --- a/crates/tracing/src/lib.rs +++ b/crates/tracing/src/lib.rs @@ -33,8 +33,7 @@ pub type BoxedLayer = Box + Send + Sync>; /// Initializes a new [Subscriber] based on the given layers. pub fn init(layers: Vec>) { - // To avoid panicking in tests, we silently fail if we cannot initialize the subscriber. - let _ = tracing_subscriber::registry().with(layers).try_init(); + tracing_subscriber::registry().with(layers).init(); } /// Builds a new tracing layer that writes to stdout.