mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: LaunchContext helpers (#7884)
This commit is contained in:
@ -449,7 +449,7 @@ where
|
||||
) -> eyre::Result<NodeHandle<NodeAdapter<RethFullAdapter<DB, T>, CB::Components>>> {
|
||||
let Self { builder, task_executor, data_dir } = self;
|
||||
|
||||
let launcher = DefaultNodeLauncher { task_executor, data_dir };
|
||||
let launcher = DefaultNodeLauncher::new(task_executor, data_dir);
|
||||
builder.launch_with(launcher).await
|
||||
}
|
||||
|
||||
|
||||
351
crates/node-builder/src/launch/common.rs
Normal file
351
crates/node-builder/src/launch/common.rs
Normal file
@ -0,0 +1,351 @@
|
||||
//! Helper types that can be used by launchers.
|
||||
|
||||
use eyre::Context;
|
||||
use rayon::ThreadPoolBuilder;
|
||||
use reth_config::PruneConfig;
|
||||
use reth_db::{database::Database, database_metrics::DatabaseMetrics};
|
||||
use reth_node_core::{
|
||||
cli::config::RethRpcConfig,
|
||||
dirs::{ChainPath, DataDirPath},
|
||||
node_config::NodeConfig,
|
||||
};
|
||||
use reth_primitives::{Chain, ChainSpec, Head, B256};
|
||||
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
|
||||
use reth_rpc::JwtSecret;
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_tracing::tracing::{error, info};
|
||||
use std::{cmp::max, sync::Arc, thread::available_parallelism};
|
||||
|
||||
/// Reusable setup for launching a node.
|
||||
///
|
||||
/// This provides commonly used boilerplate for launching a node.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LaunchContext {
|
||||
/// The task executor for the node.
|
||||
pub task_executor: TaskExecutor,
|
||||
/// The data directory for the node.
|
||||
pub data_dir: ChainPath<DataDirPath>,
|
||||
}
|
||||
|
||||
impl LaunchContext {
|
||||
/// Create a new instance of the default node launcher.
|
||||
pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
|
||||
Self { task_executor, data_dir }
|
||||
}
|
||||
|
||||
/// Attaches a database to the launch context.
|
||||
pub fn with<DB>(self, database: DB) -> LaunchContextWith<DB> {
|
||||
LaunchContextWith { inner: self, attachment: database }
|
||||
}
|
||||
|
||||
/// Loads the reth config with the configured `data_dir` and overrides settings according to the
|
||||
/// `config`.
|
||||
///
|
||||
/// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context.
|
||||
pub fn with_loaded_toml_config(
|
||||
self,
|
||||
config: NodeConfig,
|
||||
) -> eyre::Result<LaunchContextWith<WithConfigs>> {
|
||||
let toml_config = self.load_toml_config(&config)?;
|
||||
Ok(self.with(WithConfigs { config, toml_config }))
|
||||
}
|
||||
|
||||
/// Loads the reth config with the configured `data_dir` and overrides settings according to the
|
||||
/// `config`.
|
||||
pub fn load_toml_config(&self, config: &NodeConfig) -> eyre::Result<reth_config::Config> {
|
||||
let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config_path());
|
||||
|
||||
let mut toml_config = confy::load_path::<reth_config::Config>(&config_path)
|
||||
.wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
|
||||
|
||||
info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
|
||||
|
||||
// Update the config with the command line arguments
|
||||
toml_config.peers.trusted_nodes_only = config.network.trusted_only;
|
||||
|
||||
if !config.network.trusted_peers.is_empty() {
|
||||
info!(target: "reth::cli", "Adding trusted nodes");
|
||||
config.network.trusted_peers.iter().for_each(|peer| {
|
||||
toml_config.peers.trusted_nodes.insert(*peer);
|
||||
});
|
||||
}
|
||||
|
||||
Ok(toml_config)
|
||||
}
|
||||
|
||||
/// Configure global settings this includes:
|
||||
///
|
||||
/// - Raising the file descriptor limit
|
||||
/// - Configuring the global rayon thread pool
|
||||
pub fn configure_globals(&self) {
|
||||
// Raise the fd limit of the process.
|
||||
// Does not do anything on windows.
|
||||
let _ = fdlimit::raise_fd_limit();
|
||||
|
||||
// Limit the global rayon thread pool, reserving 2 cores for the rest of the system
|
||||
let _ = ThreadPoolBuilder::new()
|
||||
.num_threads(
|
||||
available_parallelism().map_or(25, |cpus| max(cpus.get().saturating_sub(2), 2)),
|
||||
)
|
||||
.build_global()
|
||||
.map_err(|e| error!("Failed to build global thread pool: {:?}", e));
|
||||
}
|
||||
}
|
||||
|
||||
/// A [LaunchContext] along with an additional value.
|
||||
///
|
||||
/// This can be used to sequentially attach additional values to the type during the launch process.
|
||||
///
|
||||
/// The type provides common boilerplate for launching a node depending on the additional value.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LaunchContextWith<T> {
|
||||
/// The wrapped launch context.
|
||||
pub inner: LaunchContext,
|
||||
/// The additional attached value.
|
||||
pub attachment: T,
|
||||
}
|
||||
|
||||
impl<T> LaunchContextWith<T> {
|
||||
/// Configure global settings this includes:
|
||||
///
|
||||
/// - Raising the file descriptor limit
|
||||
/// - Configuring the global rayon thread pool
|
||||
pub fn configure_globals(&self) {
|
||||
self.inner.configure_globals();
|
||||
}
|
||||
|
||||
/// Returns the data directory.
|
||||
pub fn data_dir(&self) -> &ChainPath<DataDirPath> {
|
||||
&self.inner.data_dir
|
||||
}
|
||||
|
||||
/// Returns the task executor.
|
||||
pub fn task_executor(&self) -> &TaskExecutor {
|
||||
&self.inner.task_executor
|
||||
}
|
||||
|
||||
/// Attaches another value to the launch context.
|
||||
pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
|
||||
LaunchContextWith {
|
||||
inner: self.inner,
|
||||
attachment: Attached::new(self.attachment, attachment),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R> LaunchContextWith<Attached<L, R>> {
|
||||
/// Get a reference to the left value.
|
||||
pub const fn left(&self) -> &L {
|
||||
&self.attachment.left
|
||||
}
|
||||
|
||||
/// Get a reference to the right value.
|
||||
pub const fn right(&self) -> &R {
|
||||
&self.attachment.right
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the right value.
|
||||
pub fn left_mut(&mut self) -> &mut L {
|
||||
&mut self.attachment.left
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the right value.
|
||||
pub fn right_mut(&mut self) -> &mut R {
|
||||
&mut self.attachment.right
|
||||
}
|
||||
}
|
||||
impl<R> LaunchContextWith<Attached<WithConfigs, R>> {
|
||||
/// Returns the attached [NodeConfig].
|
||||
pub const fn node_config(&self) -> &NodeConfig {
|
||||
&self.left().config
|
||||
}
|
||||
|
||||
/// Returns the attached [NodeConfig].
|
||||
pub fn node_config_mut(&mut self) -> &mut NodeConfig {
|
||||
&mut self.left_mut().config
|
||||
}
|
||||
|
||||
/// Returns the attached toml config [reth_config::Config].
|
||||
pub const fn toml_config(&self) -> &reth_config::Config {
|
||||
&self.left().toml_config
|
||||
}
|
||||
|
||||
/// Returns the attached toml config [reth_config::Config].
|
||||
pub fn toml_config_mut(&mut self) -> &mut reth_config::Config {
|
||||
&mut self.left_mut().toml_config
|
||||
}
|
||||
|
||||
/// Returns the configured chain spec.
|
||||
pub fn chain_spec(&self) -> Arc<ChainSpec> {
|
||||
self.node_config().chain.clone()
|
||||
}
|
||||
|
||||
/// Get the hash of the genesis block.
|
||||
pub fn genesis_hash(&self) -> B256 {
|
||||
self.node_config().chain.genesis_hash()
|
||||
}
|
||||
|
||||
/// Returns the chain identifier of the node.
|
||||
pub fn chain_id(&self) -> Chain {
|
||||
self.node_config().chain.chain
|
||||
}
|
||||
|
||||
/// Returns true if the node is configured as --dev
|
||||
pub fn is_dev(&self) -> bool {
|
||||
self.node_config().dev.dev
|
||||
}
|
||||
|
||||
/// Returns the configured [PruneConfig]
|
||||
pub fn prune_config(&self) -> eyre::Result<Option<PruneConfig>> {
|
||||
Ok(self.node_config().prune_config()?.or_else(|| self.toml_config().prune.clone()))
|
||||
}
|
||||
|
||||
/// Returns the initial pipeline target, based on whether or not the node is running in
|
||||
/// `debug.tip` mode, `debug.continuous` mode, or neither.
|
||||
///
|
||||
/// If running in `debug.tip` mode, the configured tip is returned.
|
||||
/// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned.
|
||||
/// Otherwise, `None` is returned. This is what the node will do by default.
|
||||
pub fn initial_pipeline_target(&self) -> Option<B256> {
|
||||
self.node_config().initial_pipeline_target(self.genesis_hash())
|
||||
}
|
||||
|
||||
/// Loads the JWT secret for the engine API
|
||||
pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
|
||||
let default_jwt_path = self.data_dir().jwt_path();
|
||||
let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
|
||||
Ok(secret)
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> LaunchContextWith<Attached<WithConfigs, DB>>
|
||||
where
|
||||
DB: Clone,
|
||||
{
|
||||
/// Returns the [ProviderFactory] for the attached database.
|
||||
pub fn create_provider_factory(&self) -> eyre::Result<ProviderFactory<DB>> {
|
||||
let factory = ProviderFactory::new(
|
||||
self.right().clone(),
|
||||
self.chain_spec(),
|
||||
self.data_dir().static_files_path(),
|
||||
)?
|
||||
.with_static_files_metrics();
|
||||
|
||||
Ok(factory)
|
||||
}
|
||||
|
||||
/// Creates a new [ProviderFactory] and attaches it to the launch context.
|
||||
pub fn with_provider_factory(
|
||||
self,
|
||||
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, ProviderFactory<DB>>>> {
|
||||
let factory = self.create_provider_factory()?;
|
||||
let ctx = LaunchContextWith {
|
||||
inner: self.inner,
|
||||
attachment: self.attachment.map_right(|_| factory),
|
||||
};
|
||||
|
||||
Ok(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> LaunchContextWith<Attached<WithConfigs, ProviderFactory<DB>>>
|
||||
where
|
||||
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
|
||||
{
|
||||
/// Returns access to the underlying database.
|
||||
pub fn database(&self) -> &DB {
|
||||
self.right().db_ref()
|
||||
}
|
||||
|
||||
/// Returns the configured ProviderFactory.
|
||||
pub fn provider_factory(&self) -> &ProviderFactory<DB> {
|
||||
self.right()
|
||||
}
|
||||
|
||||
/// Returns the static file provider to interact with the static files.
|
||||
pub fn static_file_provider(&self) -> StaticFileProvider {
|
||||
self.right().static_file_provider()
|
||||
}
|
||||
|
||||
/// Starts the prometheus endpoint.
|
||||
pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
|
||||
let prometheus_handle = self.node_config().install_prometheus_recorder()?;
|
||||
self.node_config()
|
||||
.start_metrics_endpoint(
|
||||
prometheus_handle,
|
||||
self.database().clone(),
|
||||
self.static_file_provider(),
|
||||
self.task_executor().clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Fetches the head block from the database.
|
||||
///
|
||||
/// If the database is empty, returns the genesis block.
|
||||
pub fn lookup_head(&self) -> eyre::Result<Head> {
|
||||
self.node_config()
|
||||
.lookup_head(self.provider_factory().clone())
|
||||
.wrap_err("the head block is missing")
|
||||
}
|
||||
}
|
||||
|
||||
/// Joins two attachments together.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct Attached<L, R> {
|
||||
left: L,
|
||||
right: R,
|
||||
}
|
||||
|
||||
impl<L, R> Attached<L, R> {
|
||||
/// Creates a new `Attached` with the given values.
|
||||
pub const fn new(left: L, right: R) -> Self {
|
||||
Self { left, right }
|
||||
}
|
||||
|
||||
/// Maps the left value to a new value.
|
||||
pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
|
||||
where
|
||||
F: FnOnce(L) -> T,
|
||||
{
|
||||
Attached::new(f(self.left), self.right)
|
||||
}
|
||||
|
||||
/// Maps the right value to a new value.
|
||||
pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
|
||||
where
|
||||
F: FnOnce(R) -> T,
|
||||
{
|
||||
Attached::new(self.left, f(self.right))
|
||||
}
|
||||
|
||||
/// Get a reference to the left value.
|
||||
pub const fn left(&self) -> &L {
|
||||
&self.left
|
||||
}
|
||||
|
||||
/// Get a reference to the right value.
|
||||
pub const fn right(&self) -> &R {
|
||||
&self.right
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the right value.
|
||||
pub fn left_mut(&mut self) -> &mut R {
|
||||
&mut self.right
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the right value.
|
||||
pub fn right_mut(&mut self) -> &mut R {
|
||||
&mut self.right
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper container type to bundle the initial [NodeConfig] and the loaded settings from the
|
||||
/// reth.toml config
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WithConfigs {
|
||||
/// The configured, usually derived from the CLI.
|
||||
pub config: NodeConfig,
|
||||
/// The loaded reth.toml config.
|
||||
pub toml_config: reth_config::Config,
|
||||
}
|
||||
@ -7,9 +7,7 @@ use crate::{
|
||||
node::FullNode,
|
||||
BuilderContext, NodeBuilderWithComponents, NodeHandle, RethFullAdapter,
|
||||
};
|
||||
use eyre::Context;
|
||||
use futures::{future, future::Either, stream, stream_select, StreamExt};
|
||||
use rayon::ThreadPoolBuilder;
|
||||
use reth_auto_seal_consensus::{AutoSealConsensus, MiningMode};
|
||||
use reth_beacon_consensus::{
|
||||
hooks::{EngineHooks, PruneHook, StaticFileHook},
|
||||
@ -29,32 +27,35 @@ use reth_interfaces::p2p::either::EitherDownloader;
|
||||
use reth_network::NetworkEvents;
|
||||
use reth_node_api::{FullNodeComponents, NodeTypes};
|
||||
use reth_node_core::{
|
||||
cli::config::RethRpcConfig,
|
||||
dirs::{ChainPath, DataDirPath},
|
||||
engine_api_store::EngineApiStore,
|
||||
engine_skip_fcu::EngineApiSkipFcu,
|
||||
exit::NodeExitFuture,
|
||||
init::init_genesis,
|
||||
node_config::NodeConfig,
|
||||
};
|
||||
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
|
||||
use reth_primitives::format_ether;
|
||||
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory};
|
||||
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions};
|
||||
use reth_prune::PrunerBuilder;
|
||||
use reth_revm::EvmProcessorFactory;
|
||||
use reth_rpc_engine_api::EngineApi;
|
||||
use reth_static_file::StaticFileProducer;
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_tracing::tracing::{debug, error, info};
|
||||
use reth_tracing::tracing::{debug, info};
|
||||
use reth_transaction_pool::TransactionPool;
|
||||
use std::{cmp::max, future::Future, sync::Arc, thread::available_parallelism};
|
||||
use std::{future::Future, sync::Arc};
|
||||
use tokio::sync::{mpsc::unbounded_channel, oneshot};
|
||||
|
||||
/// Launches a new node.
|
||||
pub mod common;
|
||||
pub use common::LaunchContext;
|
||||
|
||||
/// A general purpose trait that launches a new node of any kind.
|
||||
///
|
||||
/// Acts as a node factory.
|
||||
///
|
||||
/// This is essentially the launch logic for a node.
|
||||
///
|
||||
/// See also [DefaultNodeLauncher] and [NodeBuilderWithComponents::launch_with]
|
||||
pub trait LaunchNode<Target> {
|
||||
/// The node type that is created.
|
||||
type Node;
|
||||
@ -67,37 +68,13 @@ pub trait LaunchNode<Target> {
|
||||
#[derive(Debug)]
|
||||
pub struct DefaultNodeLauncher {
|
||||
/// The task executor for the node.
|
||||
pub task_executor: TaskExecutor,
|
||||
/// The data directory for the node.
|
||||
pub data_dir: ChainPath<DataDirPath>,
|
||||
pub ctx: LaunchContext,
|
||||
}
|
||||
|
||||
impl DefaultNodeLauncher {
|
||||
/// Create a new instance of the default node launcher.
|
||||
pub fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
|
||||
Self { task_executor, data_dir }
|
||||
}
|
||||
|
||||
/// Loads the reth config with the given datadir root
|
||||
fn load_toml_config(&self, config: &NodeConfig) -> eyre::Result<reth_config::Config> {
|
||||
let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config_path());
|
||||
|
||||
let mut toml_config = confy::load_path::<reth_config::Config>(&config_path)
|
||||
.wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
|
||||
|
||||
info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
|
||||
|
||||
// Update the config with the command line arguments
|
||||
toml_config.peers.trusted_nodes_only = config.network.trusted_only;
|
||||
|
||||
if !config.network.trusted_peers.is_empty() {
|
||||
info!(target: "reth::cli", "Adding trusted nodes");
|
||||
config.network.trusted_peers.iter().for_each(|peer| {
|
||||
toml_config.peers.trusted_nodes.insert(*peer);
|
||||
});
|
||||
}
|
||||
|
||||
Ok(toml_config)
|
||||
Self { ctx: LaunchContext::new(task_executor, data_dir) }
|
||||
}
|
||||
}
|
||||
|
||||
@ -114,6 +91,7 @@ where
|
||||
self,
|
||||
target: NodeBuilderWithComponents<RethFullAdapter<DB, T>, CB>,
|
||||
) -> eyre::Result<Self::Node> {
|
||||
let Self { ctx } = self;
|
||||
let NodeBuilderWithComponents {
|
||||
adapter: NodeTypesAdapter { types, database },
|
||||
components_builder,
|
||||
@ -121,74 +99,53 @@ where
|
||||
config,
|
||||
} = target;
|
||||
|
||||
// get config from file
|
||||
let reth_config = self.load_toml_config(&config)?;
|
||||
// configure globals
|
||||
ctx.configure_globals();
|
||||
|
||||
let Self { task_executor, data_dir } = self;
|
||||
let mut ctx = ctx
|
||||
// load the toml config
|
||||
.with_loaded_toml_config(config)?
|
||||
// attach the database
|
||||
.attach(database.clone())
|
||||
// Create the provider factory
|
||||
.with_provider_factory()?;
|
||||
|
||||
// Raise the fd limit of the process.
|
||||
// Does not do anything on windows.
|
||||
fdlimit::raise_fd_limit()?;
|
||||
|
||||
// Limit the global rayon thread pool, reserving 2 cores for the rest of the system
|
||||
let _ = ThreadPoolBuilder::new()
|
||||
.num_threads(
|
||||
available_parallelism().map_or(25, |cpus| max(cpus.get().saturating_sub(2), 2)),
|
||||
)
|
||||
.build_global()
|
||||
.map_err(|e| error!("Failed to build global thread pool: {:?}", e));
|
||||
|
||||
let provider_factory = ProviderFactory::new(
|
||||
database.clone(),
|
||||
Arc::clone(&config.chain),
|
||||
data_dir.static_files_path(),
|
||||
)?
|
||||
.with_static_files_metrics();
|
||||
info!(target: "reth::cli", "Database opened");
|
||||
|
||||
let prometheus_handle = config.install_prometheus_recorder()?;
|
||||
config
|
||||
.start_metrics_endpoint(
|
||||
prometheus_handle,
|
||||
database.clone(),
|
||||
provider_factory.static_file_provider(),
|
||||
task_executor.clone(),
|
||||
)
|
||||
.await?;
|
||||
ctx.start_prometheus_endpoint().await?;
|
||||
|
||||
debug!(target: "reth::cli", chain=%config.chain.chain,
|
||||
genesis=?config.chain.genesis_hash(), "Initializing genesis");
|
||||
debug!(target: "reth::cli", chain=%ctx.chain_id(), genesis=?ctx.genesis_hash(), "Initializing genesis");
|
||||
|
||||
let genesis_hash = init_genesis(provider_factory.clone())?;
|
||||
init_genesis(ctx.provider_factory().clone())?;
|
||||
|
||||
info!(target: "reth::cli", "\n{}", config.chain.display_hardforks());
|
||||
info!(target: "reth::cli", "\n{}", ctx.chain_spec().display_hardforks());
|
||||
|
||||
// setup the consensus instance
|
||||
let consensus: Arc<dyn Consensus> = if config.dev.dev {
|
||||
Arc::new(AutoSealConsensus::new(Arc::clone(&config.chain)))
|
||||
let consensus: Arc<dyn Consensus> = if ctx.is_dev() {
|
||||
Arc::new(AutoSealConsensus::new(ctx.chain_spec()))
|
||||
} else {
|
||||
Arc::new(BeaconConsensus::new(Arc::clone(&config.chain)))
|
||||
Arc::new(BeaconConsensus::new(ctx.chain_spec()))
|
||||
};
|
||||
|
||||
debug!(target: "reth::cli", "Spawning stages metrics listener task");
|
||||
let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel();
|
||||
let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx);
|
||||
task_executor.spawn_critical("stages metrics listener task", sync_metrics_listener);
|
||||
ctx.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
|
||||
|
||||
let prune_config = config.prune_config()?.or_else(|| reth_config.prune.clone());
|
||||
let prune_config = ctx.prune_config()?;
|
||||
|
||||
// Configure the blockchain tree for the node
|
||||
let evm_config = types.evm_config();
|
||||
let tree_config = BlockchainTreeConfig::default();
|
||||
let tree_externals = TreeExternals::new(
|
||||
provider_factory.clone(),
|
||||
ctx.provider_factory().clone(),
|
||||
consensus.clone(),
|
||||
EvmProcessorFactory::new(config.chain.clone(), evm_config.clone()),
|
||||
EvmProcessorFactory::new(ctx.chain_spec(), evm_config.clone()),
|
||||
);
|
||||
let tree = BlockchainTree::new(
|
||||
tree_externals,
|
||||
tree_config,
|
||||
prune_config.as_ref().map(|config| config.segments.clone()),
|
||||
prune_config.as_ref().map(|prune| prune.segments.clone()),
|
||||
)?
|
||||
.with_sync_metrics_tx(sync_metrics_tx.clone());
|
||||
|
||||
@ -197,40 +154,30 @@ genesis=?config.chain.genesis_hash(), "Initializing genesis");
|
||||
debug!(target: "reth::cli", "configured blockchain tree");
|
||||
|
||||
// fetch the head block from the database
|
||||
let head =
|
||||
config.lookup_head(provider_factory.clone()).wrap_err("the head block is missing")?;
|
||||
let head = ctx.lookup_head()?;
|
||||
|
||||
// setup the blockchain provider
|
||||
let blockchain_db =
|
||||
BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?;
|
||||
BlockchainProvider::new(ctx.provider_factory().clone(), blockchain_tree.clone())?;
|
||||
|
||||
let ctx = BuilderContext::new(
|
||||
let builder_ctx = BuilderContext::new(
|
||||
head,
|
||||
blockchain_db,
|
||||
task_executor,
|
||||
data_dir,
|
||||
config,
|
||||
reth_config,
|
||||
blockchain_db.clone(),
|
||||
ctx.task_executor().clone(),
|
||||
ctx.data_dir().clone(),
|
||||
ctx.node_config().clone(),
|
||||
ctx.toml_config().clone(),
|
||||
evm_config.clone(),
|
||||
);
|
||||
|
||||
debug!(target: "reth::cli", "creating components");
|
||||
let components = components_builder.build_components(&ctx).await?;
|
||||
|
||||
let BuilderContext {
|
||||
provider: blockchain_db,
|
||||
executor,
|
||||
data_dir,
|
||||
mut config,
|
||||
mut reth_config,
|
||||
..
|
||||
} = ctx;
|
||||
let components = components_builder.build_components(&builder_ctx).await?;
|
||||
|
||||
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
|
||||
|
||||
let node_adapter = NodeAdapter {
|
||||
components,
|
||||
task_executor: executor.clone(),
|
||||
task_executor: ctx.task_executor().clone(),
|
||||
provider: blockchain_db.clone(),
|
||||
evm: evm_config.clone(),
|
||||
};
|
||||
@ -250,16 +197,16 @@ genesis=?config.chain.genesis_hash(), "Initializing genesis");
|
||||
let context = ExExContext {
|
||||
head,
|
||||
provider: blockchain_db.clone(),
|
||||
task_executor: executor.clone(),
|
||||
data_dir: data_dir.clone(),
|
||||
config: config.clone(),
|
||||
reth_config: reth_config.clone(),
|
||||
task_executor: ctx.task_executor().clone(),
|
||||
data_dir: ctx.data_dir().clone(),
|
||||
config: ctx.node_config().clone(),
|
||||
reth_config: ctx.toml_config().clone(),
|
||||
pool: node_adapter.components.pool().clone(),
|
||||
events,
|
||||
notifications,
|
||||
};
|
||||
|
||||
let executor = executor.clone();
|
||||
let executor = ctx.task_executor().clone();
|
||||
exexs.push(async move {
|
||||
debug!(target: "reth::cli", id, "spawning exex");
|
||||
let span = reth_tracing::tracing::info_span!("exex", id);
|
||||
@ -287,21 +234,24 @@ genesis=?config.chain.genesis_hash(), "Initializing genesis");
|
||||
// todo(onbjerg): rm magic number
|
||||
let exex_manager = ExExManager::new(exex_handles, 1024);
|
||||
let exex_manager_handle = exex_manager.handle();
|
||||
executor.spawn_critical("exex manager", async move {
|
||||
ctx.task_executor().spawn_critical("exex manager", async move {
|
||||
exex_manager.await.expect("exex manager crashed");
|
||||
});
|
||||
|
||||
// send notifications from the blockchain tree to exex manager
|
||||
let mut canon_state_notifications = blockchain_tree.subscribe_to_canonical_state();
|
||||
let mut handle = exex_manager_handle.clone();
|
||||
executor.spawn_critical("exex manager blockchain tree notifications", async move {
|
||||
ctx.task_executor().spawn_critical(
|
||||
"exex manager blockchain tree notifications",
|
||||
async move {
|
||||
while let Ok(notification) = canon_state_notifications.recv().await {
|
||||
handle.send_async(notification.into()).await.expect(
|
||||
"blockchain tree notification could not be sent to exex
|
||||
manager",
|
||||
);
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
info!(target: "reth::cli", "ExEx Manager started");
|
||||
|
||||
@ -314,52 +264,59 @@ manager",
|
||||
let network_client = node_adapter.network().fetch_client().await?;
|
||||
let (consensus_engine_tx, mut consensus_engine_rx) = unbounded_channel();
|
||||
|
||||
if let Some(skip_fcu_threshold) = config.debug.skip_fcu {
|
||||
if let Some(skip_fcu_threshold) = ctx.node_config().debug.skip_fcu {
|
||||
debug!(target: "reth::cli", "spawning skip FCU task");
|
||||
let (skip_fcu_tx, skip_fcu_rx) = unbounded_channel();
|
||||
let engine_skip_fcu = EngineApiSkipFcu::new(skip_fcu_threshold);
|
||||
executor.spawn_critical(
|
||||
ctx.task_executor().spawn_critical(
|
||||
"skip FCU interceptor",
|
||||
engine_skip_fcu.intercept(consensus_engine_rx, skip_fcu_tx),
|
||||
);
|
||||
consensus_engine_rx = skip_fcu_rx;
|
||||
}
|
||||
|
||||
if let Some(store_path) = config.debug.engine_api_store.clone() {
|
||||
if let Some(store_path) = ctx.node_config().debug.engine_api_store.clone() {
|
||||
debug!(target: "reth::cli", "spawning engine API store");
|
||||
let (engine_intercept_tx, engine_intercept_rx) = unbounded_channel();
|
||||
let engine_api_store = EngineApiStore::new(store_path);
|
||||
executor.spawn_critical(
|
||||
ctx.task_executor().spawn_critical(
|
||||
"engine api interceptor",
|
||||
engine_api_store.intercept(consensus_engine_rx, engine_intercept_tx),
|
||||
);
|
||||
consensus_engine_rx = engine_intercept_rx;
|
||||
};
|
||||
|
||||
let max_block = config.max_block(network_client.clone(), provider_factory.clone()).await?;
|
||||
let max_block = ctx
|
||||
.node_config()
|
||||
.max_block(network_client.clone(), ctx.provider_factory().clone())
|
||||
.await?;
|
||||
let mut hooks = EngineHooks::new();
|
||||
|
||||
let static_file_producer = StaticFileProducer::new(
|
||||
provider_factory.clone(),
|
||||
provider_factory.static_file_provider(),
|
||||
ctx.provider_factory().clone(),
|
||||
ctx.static_file_provider(),
|
||||
prune_config.clone().unwrap_or_default().segments,
|
||||
);
|
||||
let static_file_producer_events = static_file_producer.lock().events();
|
||||
hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone())));
|
||||
hooks.add(StaticFileHook::new(
|
||||
static_file_producer.clone(),
|
||||
Box::new(ctx.task_executor().clone()),
|
||||
));
|
||||
info!(target: "reth::cli", "StaticFileProducer initialized");
|
||||
|
||||
// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
|
||||
if reth_config.stages.etl.dir.is_none() {
|
||||
reth_config.stages.etl.dir = Some(EtlConfig::from_datadir(&data_dir.data_dir_path()));
|
||||
if ctx.toml_config_mut().stages.etl.dir.is_none() {
|
||||
ctx.toml_config_mut().stages.etl.dir =
|
||||
Some(EtlConfig::from_datadir(&ctx.data_dir().data_dir_path()));
|
||||
}
|
||||
|
||||
// Configure the pipeline
|
||||
let pipeline_exex_handle =
|
||||
exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
|
||||
let (mut pipeline, client) = if config.dev.dev {
|
||||
let (mut pipeline, client) = if ctx.is_dev() {
|
||||
info!(target: "reth::cli", "Starting Reth in dev mode");
|
||||
|
||||
for (idx, (address, alloc)) in config.chain.genesis.alloc.iter().enumerate() {
|
||||
for (idx, (address, alloc)) in ctx.chain_spec().genesis.alloc.iter().enumerate() {
|
||||
info!(target: "reth::cli", "Allocated Genesis Account: {:02}. {} ({} ETH)", idx,
|
||||
address.to_string(), format_ether(alloc.balance));
|
||||
}
|
||||
@ -368,9 +325,9 @@ address.to_string(), format_ether(alloc.balance));
|
||||
let pending_transactions_listener =
|
||||
node_adapter.components.pool().pending_transactions_listener();
|
||||
|
||||
let mining_mode = if let Some(interval) = config.dev.block_time {
|
||||
let mining_mode = if let Some(interval) = ctx.node_config().dev.block_time {
|
||||
MiningMode::interval(interval)
|
||||
} else if let Some(max_transactions) = config.dev.block_max_transactions {
|
||||
} else if let Some(max_transactions) = ctx.node_config().dev.block_max_transactions {
|
||||
MiningMode::instant(max_transactions, pending_transactions_listener)
|
||||
} else {
|
||||
info!(target: "reth::cli", "No mining mode specified, defaulting to
|
||||
@ -379,7 +336,7 @@ ReadyTransaction");
|
||||
};
|
||||
|
||||
let (_, client, mut task) = reth_auto_seal_consensus::AutoSealBuilder::new(
|
||||
Arc::clone(&config.chain),
|
||||
ctx.chain_spec(),
|
||||
blockchain_db.clone(),
|
||||
node_adapter.components.pool().clone(),
|
||||
consensus_engine_tx.clone(),
|
||||
@ -390,12 +347,12 @@ ReadyTransaction");
|
||||
.build();
|
||||
|
||||
let mut pipeline = crate::setup::build_networked_pipeline(
|
||||
&config,
|
||||
&reth_config.stages,
|
||||
ctx.node_config(),
|
||||
&ctx.toml_config().stages,
|
||||
client.clone(),
|
||||
Arc::clone(&consensus),
|
||||
provider_factory.clone(),
|
||||
&executor,
|
||||
ctx.provider_factory().clone(),
|
||||
ctx.task_executor(),
|
||||
sync_metrics_tx,
|
||||
prune_config.clone(),
|
||||
max_block,
|
||||
@ -408,17 +365,17 @@ ReadyTransaction");
|
||||
let pipeline_events = pipeline.events();
|
||||
task.set_pipeline_events(pipeline_events);
|
||||
debug!(target: "reth::cli", "Spawning auto mine task");
|
||||
executor.spawn(Box::pin(task));
|
||||
ctx.task_executor().spawn(Box::pin(task));
|
||||
|
||||
(pipeline, EitherDownloader::Left(client))
|
||||
} else {
|
||||
let pipeline = crate::setup::build_networked_pipeline(
|
||||
&config,
|
||||
&reth_config.stages,
|
||||
ctx.node_config(),
|
||||
&ctx.toml_config().stages,
|
||||
network_client.clone(),
|
||||
Arc::clone(&consensus),
|
||||
provider_factory.clone(),
|
||||
&executor,
|
||||
ctx.provider_factory().clone(),
|
||||
ctx.task_executor(),
|
||||
sync_metrics_tx,
|
||||
prune_config.clone(),
|
||||
max_block,
|
||||
@ -433,22 +390,22 @@ ReadyTransaction");
|
||||
|
||||
let pipeline_events = pipeline.events();
|
||||
|
||||
let initial_target = config.initial_pipeline_target(genesis_hash);
|
||||
let initial_target = ctx.initial_pipeline_target();
|
||||
|
||||
let prune_config = prune_config.unwrap_or_default();
|
||||
let mut pruner_builder = PrunerBuilder::new(prune_config.clone())
|
||||
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
|
||||
.prune_delete_limit(config.chain.prune_delete_limit)
|
||||
.prune_delete_limit(ctx.chain_spec().prune_delete_limit)
|
||||
.timeout(PrunerBuilder::DEFAULT_TIMEOUT);
|
||||
if let Some(exex_manager_handle) = &exex_manager_handle {
|
||||
pruner_builder =
|
||||
pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
|
||||
}
|
||||
|
||||
let mut pruner = pruner_builder.build(provider_factory.clone());
|
||||
let mut pruner = pruner_builder.build(ctx.provider_factory().clone());
|
||||
|
||||
let pruner_events = pruner.events();
|
||||
hooks.add(PruneHook::new(pruner, Box::new(executor.clone())));
|
||||
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor().clone())));
|
||||
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
|
||||
|
||||
// Configure the consensus engine
|
||||
@ -456,10 +413,10 @@ ReadyTransaction");
|
||||
client,
|
||||
pipeline,
|
||||
blockchain_db.clone(),
|
||||
Box::new(executor.clone()),
|
||||
Box::new(ctx.task_executor().clone()),
|
||||
Box::new(node_adapter.components.network().clone()),
|
||||
max_block,
|
||||
config.debug.continuous,
|
||||
ctx.node_config().debug.continuous,
|
||||
node_adapter.components.payload_builder().clone(),
|
||||
initial_target,
|
||||
reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN,
|
||||
@ -473,7 +430,7 @@ ReadyTransaction");
|
||||
node_adapter.components.network().event_listener().map(Into::into),
|
||||
beacon_engine_handle.event_listener().map(Into::into),
|
||||
pipeline_events.map(Into::into),
|
||||
if config.debug.tip.is_none() && !config.dev.dev {
|
||||
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
|
||||
Either::Left(
|
||||
ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone()))
|
||||
.map(Into::into),
|
||||
@ -484,7 +441,7 @@ ReadyTransaction");
|
||||
pruner_events.map(Into::into),
|
||||
static_file_producer_events.map(Into::into)
|
||||
);
|
||||
executor.spawn_critical(
|
||||
ctx.task_executor().spawn_critical(
|
||||
"events task",
|
||||
node::handle_events(
|
||||
Some(node_adapter.components.network().clone()),
|
||||
@ -496,39 +453,38 @@ ReadyTransaction");
|
||||
|
||||
let engine_api = EngineApi::new(
|
||||
blockchain_db.clone(),
|
||||
config.chain.clone(),
|
||||
ctx.chain_spec(),
|
||||
beacon_engine_handle,
|
||||
node_adapter.components.payload_builder().clone().into(),
|
||||
Box::new(executor.clone()),
|
||||
Box::new(ctx.task_executor().clone()),
|
||||
);
|
||||
info!(target: "reth::cli", "Engine API handler initialized");
|
||||
|
||||
// extract the jwt secret from the args if possible
|
||||
let default_jwt_path = data_dir.jwt_path();
|
||||
let jwt_secret = config.rpc.auth_jwt_secret(default_jwt_path)?;
|
||||
let jwt_secret = ctx.auth_jwt_secret()?;
|
||||
|
||||
// adjust rpc port numbers based on instance number
|
||||
config.adjust_instance_ports();
|
||||
ctx.node_config_mut().adjust_instance_ports();
|
||||
|
||||
// Start RPC servers
|
||||
let (rpc_server_handles, mut rpc_registry) = crate::rpc::launch_rpc_servers(
|
||||
node_adapter.clone(),
|
||||
engine_api,
|
||||
&config,
|
||||
ctx.node_config(),
|
||||
jwt_secret,
|
||||
rpc,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// in dev mode we generate 20 random dev-signer accounts
|
||||
if config.dev.dev {
|
||||
if ctx.is_dev() {
|
||||
rpc_registry.eth_api().with_dev_accounts();
|
||||
}
|
||||
|
||||
// Run consensus engine to completion
|
||||
let (tx, rx) = oneshot::channel();
|
||||
info!(target: "reth::cli", "Starting consensus engine");
|
||||
executor.spawn_critical_blocking("consensus engine", async move {
|
||||
ctx.task_executor().spawn_critical_blocking("consensus engine", async move {
|
||||
let res = beacon_consensus_engine.await;
|
||||
let _ = tx.send(res);
|
||||
});
|
||||
@ -539,11 +495,11 @@ ReadyTransaction");
|
||||
network: node_adapter.components.network().clone(),
|
||||
provider: node_adapter.provider.clone(),
|
||||
payload_builder: node_adapter.components.payload_builder().clone(),
|
||||
task_executor: executor,
|
||||
task_executor: ctx.task_executor().clone(),
|
||||
rpc_server_handles,
|
||||
rpc_registry,
|
||||
config,
|
||||
data_dir,
|
||||
config: ctx.node_config().clone(),
|
||||
data_dir: ctx.data_dir().clone(),
|
||||
};
|
||||
// Notify on node started
|
||||
on_node_started.on_event(full_node.clone())?;
|
||||
Reference in New Issue
Block a user