diff --git a/crates/node-builder/src/builder/mod.rs b/crates/node-builder/src/builder/mod.rs index 44bb60588..9649360eb 100644 --- a/crates/node-builder/src/builder/mod.rs +++ b/crates/node-builder/src/builder/mod.rs @@ -449,7 +449,7 @@ where ) -> eyre::Result, CB::Components>>> { let Self { builder, task_executor, data_dir } = self; - let launcher = DefaultNodeLauncher { task_executor, data_dir }; + let launcher = DefaultNodeLauncher::new(task_executor, data_dir); builder.launch_with(launcher).await } diff --git a/crates/node-builder/src/launch/common.rs b/crates/node-builder/src/launch/common.rs new file mode 100644 index 000000000..765673bf0 --- /dev/null +++ b/crates/node-builder/src/launch/common.rs @@ -0,0 +1,351 @@ +//! Helper types that can be used by launchers. + +use eyre::Context; +use rayon::ThreadPoolBuilder; +use reth_config::PruneConfig; +use reth_db::{database::Database, database_metrics::DatabaseMetrics}; +use reth_node_core::{ + cli::config::RethRpcConfig, + dirs::{ChainPath, DataDirPath}, + node_config::NodeConfig, +}; +use reth_primitives::{Chain, ChainSpec, Head, B256}; +use reth_provider::{providers::StaticFileProvider, ProviderFactory}; +use reth_rpc::JwtSecret; +use reth_tasks::TaskExecutor; +use reth_tracing::tracing::{error, info}; +use std::{cmp::max, sync::Arc, thread::available_parallelism}; + +/// Reusable setup for launching a node. +/// +/// This provides commonly used boilerplate for launching a node. +#[derive(Debug, Clone)] +pub struct LaunchContext { + /// The task executor for the node. + pub task_executor: TaskExecutor, + /// The data directory for the node. + pub data_dir: ChainPath, +} + +impl LaunchContext { + /// Create a new instance of the default node launcher. + pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath) -> Self { + Self { task_executor, data_dir } + } + + /// Attaches a database to the launch context. + pub fn with(self, database: DB) -> LaunchContextWith { + LaunchContextWith { inner: self, attachment: database } + } + + /// Loads the reth config with the configured `data_dir` and overrides settings according to the + /// `config`. + /// + /// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context. + pub fn with_loaded_toml_config( + self, + config: NodeConfig, + ) -> eyre::Result> { + let toml_config = self.load_toml_config(&config)?; + Ok(self.with(WithConfigs { config, toml_config })) + } + + /// Loads the reth config with the configured `data_dir` and overrides settings according to the + /// `config`. + pub fn load_toml_config(&self, config: &NodeConfig) -> eyre::Result { + let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config_path()); + + let mut toml_config = confy::load_path::(&config_path) + .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?; + + info!(target: "reth::cli", path = ?config_path, "Configuration loaded"); + + // Update the config with the command line arguments + toml_config.peers.trusted_nodes_only = config.network.trusted_only; + + if !config.network.trusted_peers.is_empty() { + info!(target: "reth::cli", "Adding trusted nodes"); + config.network.trusted_peers.iter().for_each(|peer| { + toml_config.peers.trusted_nodes.insert(*peer); + }); + } + + Ok(toml_config) + } + + /// Configure global settings this includes: + /// + /// - Raising the file descriptor limit + /// - Configuring the global rayon thread pool + pub fn configure_globals(&self) { + // Raise the fd limit of the process. + // Does not do anything on windows. + let _ = fdlimit::raise_fd_limit(); + + // Limit the global rayon thread pool, reserving 2 cores for the rest of the system + let _ = ThreadPoolBuilder::new() + .num_threads( + available_parallelism().map_or(25, |cpus| max(cpus.get().saturating_sub(2), 2)), + ) + .build_global() + .map_err(|e| error!("Failed to build global thread pool: {:?}", e)); + } +} + +/// A [LaunchContext] along with an additional value. +/// +/// This can be used to sequentially attach additional values to the type during the launch process. +/// +/// The type provides common boilerplate for launching a node depending on the additional value. +#[derive(Debug, Clone)] +pub struct LaunchContextWith { + /// The wrapped launch context. + pub inner: LaunchContext, + /// The additional attached value. + pub attachment: T, +} + +impl LaunchContextWith { + /// Configure global settings this includes: + /// + /// - Raising the file descriptor limit + /// - Configuring the global rayon thread pool + pub fn configure_globals(&self) { + self.inner.configure_globals(); + } + + /// Returns the data directory. + pub fn data_dir(&self) -> &ChainPath { + &self.inner.data_dir + } + + /// Returns the task executor. + pub fn task_executor(&self) -> &TaskExecutor { + &self.inner.task_executor + } + + /// Attaches another value to the launch context. + pub fn attach(self, attachment: A) -> LaunchContextWith> { + LaunchContextWith { + inner: self.inner, + attachment: Attached::new(self.attachment, attachment), + } + } +} + +impl LaunchContextWith> { + /// Get a reference to the left value. + pub const fn left(&self) -> &L { + &self.attachment.left + } + + /// Get a reference to the right value. + pub const fn right(&self) -> &R { + &self.attachment.right + } + + /// Get a mutable reference to the right value. + pub fn left_mut(&mut self) -> &mut L { + &mut self.attachment.left + } + + /// Get a mutable reference to the right value. + pub fn right_mut(&mut self) -> &mut R { + &mut self.attachment.right + } +} +impl LaunchContextWith> { + /// Returns the attached [NodeConfig]. + pub const fn node_config(&self) -> &NodeConfig { + &self.left().config + } + + /// Returns the attached [NodeConfig]. + pub fn node_config_mut(&mut self) -> &mut NodeConfig { + &mut self.left_mut().config + } + + /// Returns the attached toml config [reth_config::Config]. + pub const fn toml_config(&self) -> &reth_config::Config { + &self.left().toml_config + } + + /// Returns the attached toml config [reth_config::Config]. + pub fn toml_config_mut(&mut self) -> &mut reth_config::Config { + &mut self.left_mut().toml_config + } + + /// Returns the configured chain spec. + pub fn chain_spec(&self) -> Arc { + self.node_config().chain.clone() + } + + /// Get the hash of the genesis block. + pub fn genesis_hash(&self) -> B256 { + self.node_config().chain.genesis_hash() + } + + /// Returns the chain identifier of the node. + pub fn chain_id(&self) -> Chain { + self.node_config().chain.chain + } + + /// Returns true if the node is configured as --dev + pub fn is_dev(&self) -> bool { + self.node_config().dev.dev + } + + /// Returns the configured [PruneConfig] + pub fn prune_config(&self) -> eyre::Result> { + Ok(self.node_config().prune_config()?.or_else(|| self.toml_config().prune.clone())) + } + + /// Returns the initial pipeline target, based on whether or not the node is running in + /// `debug.tip` mode, `debug.continuous` mode, or neither. + /// + /// If running in `debug.tip` mode, the configured tip is returned. + /// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned. + /// Otherwise, `None` is returned. This is what the node will do by default. + pub fn initial_pipeline_target(&self) -> Option { + self.node_config().initial_pipeline_target(self.genesis_hash()) + } + + /// Loads the JWT secret for the engine API + pub fn auth_jwt_secret(&self) -> eyre::Result { + let default_jwt_path = self.data_dir().jwt_path(); + let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?; + Ok(secret) + } +} + +impl LaunchContextWith> +where + DB: Clone, +{ + /// Returns the [ProviderFactory] for the attached database. + pub fn create_provider_factory(&self) -> eyre::Result> { + let factory = ProviderFactory::new( + self.right().clone(), + self.chain_spec(), + self.data_dir().static_files_path(), + )? + .with_static_files_metrics(); + + Ok(factory) + } + + /// Creates a new [ProviderFactory] and attaches it to the launch context. + pub fn with_provider_factory( + self, + ) -> eyre::Result>>> { + let factory = self.create_provider_factory()?; + let ctx = LaunchContextWith { + inner: self.inner, + attachment: self.attachment.map_right(|_| factory), + }; + + Ok(ctx) + } +} + +impl LaunchContextWith>> +where + DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static, +{ + /// Returns access to the underlying database. + pub fn database(&self) -> &DB { + self.right().db_ref() + } + + /// Returns the configured ProviderFactory. + pub fn provider_factory(&self) -> &ProviderFactory { + self.right() + } + + /// Returns the static file provider to interact with the static files. + pub fn static_file_provider(&self) -> StaticFileProvider { + self.right().static_file_provider() + } + + /// Starts the prometheus endpoint. + pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> { + let prometheus_handle = self.node_config().install_prometheus_recorder()?; + self.node_config() + .start_metrics_endpoint( + prometheus_handle, + self.database().clone(), + self.static_file_provider(), + self.task_executor().clone(), + ) + .await + } + + /// Fetches the head block from the database. + /// + /// If the database is empty, returns the genesis block. + pub fn lookup_head(&self) -> eyre::Result { + self.node_config() + .lookup_head(self.provider_factory().clone()) + .wrap_err("the head block is missing") + } +} + +/// Joins two attachments together. +#[derive(Clone, Copy, Debug)] +pub struct Attached { + left: L, + right: R, +} + +impl Attached { + /// Creates a new `Attached` with the given values. + pub const fn new(left: L, right: R) -> Self { + Self { left, right } + } + + /// Maps the left value to a new value. + pub fn map_left(self, f: F) -> Attached + where + F: FnOnce(L) -> T, + { + Attached::new(f(self.left), self.right) + } + + /// Maps the right value to a new value. + pub fn map_right(self, f: F) -> Attached + where + F: FnOnce(R) -> T, + { + Attached::new(self.left, f(self.right)) + } + + /// Get a reference to the left value. + pub const fn left(&self) -> &L { + &self.left + } + + /// Get a reference to the right value. + pub const fn right(&self) -> &R { + &self.right + } + + /// Get a mutable reference to the right value. + pub fn left_mut(&mut self) -> &mut R { + &mut self.right + } + + /// Get a mutable reference to the right value. + pub fn right_mut(&mut self) -> &mut R { + &mut self.right + } +} + +/// Helper container type to bundle the initial [NodeConfig] and the loaded settings from the +/// reth.toml config +#[derive(Debug, Clone)] +pub struct WithConfigs { + /// The configured, usually derived from the CLI. + pub config: NodeConfig, + /// The loaded reth.toml config. + pub toml_config: reth_config::Config, +} diff --git a/crates/node-builder/src/launch.rs b/crates/node-builder/src/launch/mod.rs similarity index 67% rename from crates/node-builder/src/launch.rs rename to crates/node-builder/src/launch/mod.rs index 645598ada..6181e0c98 100644 --- a/crates/node-builder/src/launch.rs +++ b/crates/node-builder/src/launch/mod.rs @@ -7,9 +7,7 @@ use crate::{ node::FullNode, BuilderContext, NodeBuilderWithComponents, NodeHandle, RethFullAdapter, }; -use eyre::Context; use futures::{future, future::Either, stream, stream_select, StreamExt}; -use rayon::ThreadPoolBuilder; use reth_auto_seal_consensus::{AutoSealConsensus, MiningMode}; use reth_beacon_consensus::{ hooks::{EngineHooks, PruneHook, StaticFileHook}, @@ -29,32 +27,35 @@ use reth_interfaces::p2p::either::EitherDownloader; use reth_network::NetworkEvents; use reth_node_api::{FullNodeComponents, NodeTypes}; use reth_node_core::{ - cli::config::RethRpcConfig, dirs::{ChainPath, DataDirPath}, engine_api_store::EngineApiStore, engine_skip_fcu::EngineApiSkipFcu, exit::NodeExitFuture, init::init_genesis, - node_config::NodeConfig, }; use reth_node_events::{cl::ConsensusLayerHealthEvents, node}; use reth_primitives::format_ether; -use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory}; +use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions}; use reth_prune::PrunerBuilder; use reth_revm::EvmProcessorFactory; use reth_rpc_engine_api::EngineApi; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; -use reth_tracing::tracing::{debug, error, info}; +use reth_tracing::tracing::{debug, info}; use reth_transaction_pool::TransactionPool; -use std::{cmp::max, future::Future, sync::Arc, thread::available_parallelism}; +use std::{future::Future, sync::Arc}; use tokio::sync::{mpsc::unbounded_channel, oneshot}; -/// Launches a new node. +pub mod common; +pub use common::LaunchContext; + +/// A general purpose trait that launches a new node of any kind. /// /// Acts as a node factory. /// /// This is essentially the launch logic for a node. +/// +/// See also [DefaultNodeLauncher] and [NodeBuilderWithComponents::launch_with] pub trait LaunchNode { /// The node type that is created. type Node; @@ -67,37 +68,13 @@ pub trait LaunchNode { #[derive(Debug)] pub struct DefaultNodeLauncher { /// The task executor for the node. - pub task_executor: TaskExecutor, - /// The data directory for the node. - pub data_dir: ChainPath, + pub ctx: LaunchContext, } impl DefaultNodeLauncher { /// Create a new instance of the default node launcher. pub fn new(task_executor: TaskExecutor, data_dir: ChainPath) -> Self { - Self { task_executor, data_dir } - } - - /// Loads the reth config with the given datadir root - fn load_toml_config(&self, config: &NodeConfig) -> eyre::Result { - let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config_path()); - - let mut toml_config = confy::load_path::(&config_path) - .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?; - - info!(target: "reth::cli", path = ?config_path, "Configuration loaded"); - - // Update the config with the command line arguments - toml_config.peers.trusted_nodes_only = config.network.trusted_only; - - if !config.network.trusted_peers.is_empty() { - info!(target: "reth::cli", "Adding trusted nodes"); - config.network.trusted_peers.iter().for_each(|peer| { - toml_config.peers.trusted_nodes.insert(*peer); - }); - } - - Ok(toml_config) + Self { ctx: LaunchContext::new(task_executor, data_dir) } } } @@ -114,6 +91,7 @@ where self, target: NodeBuilderWithComponents, CB>, ) -> eyre::Result { + let Self { ctx } = self; let NodeBuilderWithComponents { adapter: NodeTypesAdapter { types, database }, components_builder, @@ -121,74 +99,53 @@ where config, } = target; - // get config from file - let reth_config = self.load_toml_config(&config)?; + // configure globals + ctx.configure_globals(); - let Self { task_executor, data_dir } = self; + let mut ctx = ctx + // load the toml config + .with_loaded_toml_config(config)? + // attach the database + .attach(database.clone()) + // Create the provider factory + .with_provider_factory()?; - // Raise the fd limit of the process. - // Does not do anything on windows. - fdlimit::raise_fd_limit()?; - - // Limit the global rayon thread pool, reserving 2 cores for the rest of the system - let _ = ThreadPoolBuilder::new() - .num_threads( - available_parallelism().map_or(25, |cpus| max(cpus.get().saturating_sub(2), 2)), - ) - .build_global() - .map_err(|e| error!("Failed to build global thread pool: {:?}", e)); - - let provider_factory = ProviderFactory::new( - database.clone(), - Arc::clone(&config.chain), - data_dir.static_files_path(), - )? - .with_static_files_metrics(); info!(target: "reth::cli", "Database opened"); - let prometheus_handle = config.install_prometheus_recorder()?; - config - .start_metrics_endpoint( - prometheus_handle, - database.clone(), - provider_factory.static_file_provider(), - task_executor.clone(), - ) - .await?; + ctx.start_prometheus_endpoint().await?; - debug!(target: "reth::cli", chain=%config.chain.chain, -genesis=?config.chain.genesis_hash(), "Initializing genesis"); + debug!(target: "reth::cli", chain=%ctx.chain_id(), genesis=?ctx.genesis_hash(), "Initializing genesis"); - let genesis_hash = init_genesis(provider_factory.clone())?; + init_genesis(ctx.provider_factory().clone())?; - info!(target: "reth::cli", "\n{}", config.chain.display_hardforks()); + info!(target: "reth::cli", "\n{}", ctx.chain_spec().display_hardforks()); // setup the consensus instance - let consensus: Arc = if config.dev.dev { - Arc::new(AutoSealConsensus::new(Arc::clone(&config.chain))) + let consensus: Arc = if ctx.is_dev() { + Arc::new(AutoSealConsensus::new(ctx.chain_spec())) } else { - Arc::new(BeaconConsensus::new(Arc::clone(&config.chain))) + Arc::new(BeaconConsensus::new(ctx.chain_spec())) }; debug!(target: "reth::cli", "Spawning stages metrics listener task"); let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel(); let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx); - task_executor.spawn_critical("stages metrics listener task", sync_metrics_listener); + ctx.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener); - let prune_config = config.prune_config()?.or_else(|| reth_config.prune.clone()); + let prune_config = ctx.prune_config()?; // Configure the blockchain tree for the node let evm_config = types.evm_config(); let tree_config = BlockchainTreeConfig::default(); let tree_externals = TreeExternals::new( - provider_factory.clone(), + ctx.provider_factory().clone(), consensus.clone(), - EvmProcessorFactory::new(config.chain.clone(), evm_config.clone()), + EvmProcessorFactory::new(ctx.chain_spec(), evm_config.clone()), ); let tree = BlockchainTree::new( tree_externals, tree_config, - prune_config.as_ref().map(|config| config.segments.clone()), + prune_config.as_ref().map(|prune| prune.segments.clone()), )? .with_sync_metrics_tx(sync_metrics_tx.clone()); @@ -197,40 +154,30 @@ genesis=?config.chain.genesis_hash(), "Initializing genesis"); debug!(target: "reth::cli", "configured blockchain tree"); // fetch the head block from the database - let head = - config.lookup_head(provider_factory.clone()).wrap_err("the head block is missing")?; + let head = ctx.lookup_head()?; // setup the blockchain provider let blockchain_db = - BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?; + BlockchainProvider::new(ctx.provider_factory().clone(), blockchain_tree.clone())?; - let ctx = BuilderContext::new( + let builder_ctx = BuilderContext::new( head, - blockchain_db, - task_executor, - data_dir, - config, - reth_config, + blockchain_db.clone(), + ctx.task_executor().clone(), + ctx.data_dir().clone(), + ctx.node_config().clone(), + ctx.toml_config().clone(), evm_config.clone(), ); debug!(target: "reth::cli", "creating components"); - let components = components_builder.build_components(&ctx).await?; - - let BuilderContext { - provider: blockchain_db, - executor, - data_dir, - mut config, - mut reth_config, - .. - } = ctx; + let components = components_builder.build_components(&builder_ctx).await?; let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; let node_adapter = NodeAdapter { components, - task_executor: executor.clone(), + task_executor: ctx.task_executor().clone(), provider: blockchain_db.clone(), evm: evm_config.clone(), }; @@ -250,16 +197,16 @@ genesis=?config.chain.genesis_hash(), "Initializing genesis"); let context = ExExContext { head, provider: blockchain_db.clone(), - task_executor: executor.clone(), - data_dir: data_dir.clone(), - config: config.clone(), - reth_config: reth_config.clone(), + task_executor: ctx.task_executor().clone(), + data_dir: ctx.data_dir().clone(), + config: ctx.node_config().clone(), + reth_config: ctx.toml_config().clone(), pool: node_adapter.components.pool().clone(), events, notifications, }; - let executor = executor.clone(); + let executor = ctx.task_executor().clone(); exexs.push(async move { debug!(target: "reth::cli", id, "spawning exex"); let span = reth_tracing::tracing::info_span!("exex", id); @@ -287,21 +234,24 @@ genesis=?config.chain.genesis_hash(), "Initializing genesis"); // todo(onbjerg): rm magic number let exex_manager = ExExManager::new(exex_handles, 1024); let exex_manager_handle = exex_manager.handle(); - executor.spawn_critical("exex manager", async move { + ctx.task_executor().spawn_critical("exex manager", async move { exex_manager.await.expect("exex manager crashed"); }); // send notifications from the blockchain tree to exex manager let mut canon_state_notifications = blockchain_tree.subscribe_to_canonical_state(); let mut handle = exex_manager_handle.clone(); - executor.spawn_critical("exex manager blockchain tree notifications", async move { - while let Ok(notification) = canon_state_notifications.recv().await { - handle.send_async(notification.into()).await.expect( - "blockchain tree notification could not be sent to exex + ctx.task_executor().spawn_critical( + "exex manager blockchain tree notifications", + async move { + while let Ok(notification) = canon_state_notifications.recv().await { + handle.send_async(notification.into()).await.expect( + "blockchain tree notification could not be sent to exex manager", - ); - } - }); + ); + } + }, + ); info!(target: "reth::cli", "ExEx Manager started"); @@ -314,52 +264,59 @@ manager", let network_client = node_adapter.network().fetch_client().await?; let (consensus_engine_tx, mut consensus_engine_rx) = unbounded_channel(); - if let Some(skip_fcu_threshold) = config.debug.skip_fcu { + if let Some(skip_fcu_threshold) = ctx.node_config().debug.skip_fcu { debug!(target: "reth::cli", "spawning skip FCU task"); let (skip_fcu_tx, skip_fcu_rx) = unbounded_channel(); let engine_skip_fcu = EngineApiSkipFcu::new(skip_fcu_threshold); - executor.spawn_critical( + ctx.task_executor().spawn_critical( "skip FCU interceptor", engine_skip_fcu.intercept(consensus_engine_rx, skip_fcu_tx), ); consensus_engine_rx = skip_fcu_rx; } - if let Some(store_path) = config.debug.engine_api_store.clone() { + if let Some(store_path) = ctx.node_config().debug.engine_api_store.clone() { debug!(target: "reth::cli", "spawning engine API store"); let (engine_intercept_tx, engine_intercept_rx) = unbounded_channel(); let engine_api_store = EngineApiStore::new(store_path); - executor.spawn_critical( + ctx.task_executor().spawn_critical( "engine api interceptor", engine_api_store.intercept(consensus_engine_rx, engine_intercept_tx), ); consensus_engine_rx = engine_intercept_rx; }; - let max_block = config.max_block(network_client.clone(), provider_factory.clone()).await?; + let max_block = ctx + .node_config() + .max_block(network_client.clone(), ctx.provider_factory().clone()) + .await?; let mut hooks = EngineHooks::new(); let static_file_producer = StaticFileProducer::new( - provider_factory.clone(), - provider_factory.static_file_provider(), + ctx.provider_factory().clone(), + ctx.static_file_provider(), prune_config.clone().unwrap_or_default().segments, ); let static_file_producer_events = static_file_producer.lock().events(); - hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone()))); + hooks.add(StaticFileHook::new( + static_file_producer.clone(), + Box::new(ctx.task_executor().clone()), + )); info!(target: "reth::cli", "StaticFileProducer initialized"); // Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to - if reth_config.stages.etl.dir.is_none() { - reth_config.stages.etl.dir = Some(EtlConfig::from_datadir(&data_dir.data_dir_path())); + if ctx.toml_config_mut().stages.etl.dir.is_none() { + ctx.toml_config_mut().stages.etl.dir = + Some(EtlConfig::from_datadir(&ctx.data_dir().data_dir_path())); } // Configure the pipeline let pipeline_exex_handle = exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty); - let (mut pipeline, client) = if config.dev.dev { + let (mut pipeline, client) = if ctx.is_dev() { info!(target: "reth::cli", "Starting Reth in dev mode"); - for (idx, (address, alloc)) in config.chain.genesis.alloc.iter().enumerate() { + for (idx, (address, alloc)) in ctx.chain_spec().genesis.alloc.iter().enumerate() { info!(target: "reth::cli", "Allocated Genesis Account: {:02}. {} ({} ETH)", idx, address.to_string(), format_ether(alloc.balance)); } @@ -368,9 +325,9 @@ address.to_string(), format_ether(alloc.balance)); let pending_transactions_listener = node_adapter.components.pool().pending_transactions_listener(); - let mining_mode = if let Some(interval) = config.dev.block_time { + let mining_mode = if let Some(interval) = ctx.node_config().dev.block_time { MiningMode::interval(interval) - } else if let Some(max_transactions) = config.dev.block_max_transactions { + } else if let Some(max_transactions) = ctx.node_config().dev.block_max_transactions { MiningMode::instant(max_transactions, pending_transactions_listener) } else { info!(target: "reth::cli", "No mining mode specified, defaulting to @@ -379,7 +336,7 @@ ReadyTransaction"); }; let (_, client, mut task) = reth_auto_seal_consensus::AutoSealBuilder::new( - Arc::clone(&config.chain), + ctx.chain_spec(), blockchain_db.clone(), node_adapter.components.pool().clone(), consensus_engine_tx.clone(), @@ -390,12 +347,12 @@ ReadyTransaction"); .build(); let mut pipeline = crate::setup::build_networked_pipeline( - &config, - &reth_config.stages, + ctx.node_config(), + &ctx.toml_config().stages, client.clone(), Arc::clone(&consensus), - provider_factory.clone(), - &executor, + ctx.provider_factory().clone(), + ctx.task_executor(), sync_metrics_tx, prune_config.clone(), max_block, @@ -408,17 +365,17 @@ ReadyTransaction"); let pipeline_events = pipeline.events(); task.set_pipeline_events(pipeline_events); debug!(target: "reth::cli", "Spawning auto mine task"); - executor.spawn(Box::pin(task)); + ctx.task_executor().spawn(Box::pin(task)); (pipeline, EitherDownloader::Left(client)) } else { let pipeline = crate::setup::build_networked_pipeline( - &config, - &reth_config.stages, + ctx.node_config(), + &ctx.toml_config().stages, network_client.clone(), Arc::clone(&consensus), - provider_factory.clone(), - &executor, + ctx.provider_factory().clone(), + ctx.task_executor(), sync_metrics_tx, prune_config.clone(), max_block, @@ -433,22 +390,22 @@ ReadyTransaction"); let pipeline_events = pipeline.events(); - let initial_target = config.initial_pipeline_target(genesis_hash); + let initial_target = ctx.initial_pipeline_target(); let prune_config = prune_config.unwrap_or_default(); let mut pruner_builder = PrunerBuilder::new(prune_config.clone()) .max_reorg_depth(tree_config.max_reorg_depth() as usize) - .prune_delete_limit(config.chain.prune_delete_limit) + .prune_delete_limit(ctx.chain_spec().prune_delete_limit) .timeout(PrunerBuilder::DEFAULT_TIMEOUT); if let Some(exex_manager_handle) = &exex_manager_handle { pruner_builder = pruner_builder.finished_exex_height(exex_manager_handle.finished_height()); } - let mut pruner = pruner_builder.build(provider_factory.clone()); + let mut pruner = pruner_builder.build(ctx.provider_factory().clone()); let pruner_events = pruner.events(); - hooks.add(PruneHook::new(pruner, Box::new(executor.clone()))); + hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor().clone()))); info!(target: "reth::cli", ?prune_config, "Pruner initialized"); // Configure the consensus engine @@ -456,10 +413,10 @@ ReadyTransaction"); client, pipeline, blockchain_db.clone(), - Box::new(executor.clone()), + Box::new(ctx.task_executor().clone()), Box::new(node_adapter.components.network().clone()), max_block, - config.debug.continuous, + ctx.node_config().debug.continuous, node_adapter.components.payload_builder().clone(), initial_target, reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN, @@ -473,7 +430,7 @@ ReadyTransaction"); node_adapter.components.network().event_listener().map(Into::into), beacon_engine_handle.event_listener().map(Into::into), pipeline_events.map(Into::into), - if config.debug.tip.is_none() && !config.dev.dev { + if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { Either::Left( ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone())) .map(Into::into), @@ -484,7 +441,7 @@ ReadyTransaction"); pruner_events.map(Into::into), static_file_producer_events.map(Into::into) ); - executor.spawn_critical( + ctx.task_executor().spawn_critical( "events task", node::handle_events( Some(node_adapter.components.network().clone()), @@ -496,39 +453,38 @@ ReadyTransaction"); let engine_api = EngineApi::new( blockchain_db.clone(), - config.chain.clone(), + ctx.chain_spec(), beacon_engine_handle, node_adapter.components.payload_builder().clone().into(), - Box::new(executor.clone()), + Box::new(ctx.task_executor().clone()), ); info!(target: "reth::cli", "Engine API handler initialized"); // extract the jwt secret from the args if possible - let default_jwt_path = data_dir.jwt_path(); - let jwt_secret = config.rpc.auth_jwt_secret(default_jwt_path)?; + let jwt_secret = ctx.auth_jwt_secret()?; // adjust rpc port numbers based on instance number - config.adjust_instance_ports(); + ctx.node_config_mut().adjust_instance_ports(); // Start RPC servers let (rpc_server_handles, mut rpc_registry) = crate::rpc::launch_rpc_servers( node_adapter.clone(), engine_api, - &config, + ctx.node_config(), jwt_secret, rpc, ) .await?; // in dev mode we generate 20 random dev-signer accounts - if config.dev.dev { + if ctx.is_dev() { rpc_registry.eth_api().with_dev_accounts(); } // Run consensus engine to completion let (tx, rx) = oneshot::channel(); info!(target: "reth::cli", "Starting consensus engine"); - executor.spawn_critical_blocking("consensus engine", async move { + ctx.task_executor().spawn_critical_blocking("consensus engine", async move { let res = beacon_consensus_engine.await; let _ = tx.send(res); }); @@ -539,11 +495,11 @@ ReadyTransaction"); network: node_adapter.components.network().clone(), provider: node_adapter.provider.clone(), payload_builder: node_adapter.components.payload_builder().clone(), - task_executor: executor, + task_executor: ctx.task_executor().clone(), rpc_server_handles, rpc_registry, - config, - data_dir, + config: ctx.node_config().clone(), + data_dir: ctx.data_dir().clone(), }; // Notify on node started on_node_started.on_event(full_node.clone())?;