diff --git a/Cargo.lock b/Cargo.lock index c1f28262e..612863dee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7108,6 +7108,7 @@ dependencies = [ "reth-network-p2p", "reth-stages-api", "reth-tasks", + "thiserror", "tokio", "tokio-stream", ] @@ -7722,6 +7723,7 @@ dependencies = [ "reth-consensus", "reth-db", "reth-e2e-test-utils", + "reth-ethereum-engine", "reth-ethereum-engine-primitives", "reth-ethereum-payload-builder", "reth-evm-ethereum", @@ -7730,13 +7732,19 @@ dependencies = [ "reth-node-api", "reth-node-builder", "reth-node-core", + "reth-node-events", "reth-payload-builder", "reth-provider", "reth-rpc", + "reth-rpc-engine-api", + "reth-rpc-types", + "reth-tasks", + "reth-tokio-util", "reth-tracing", "reth-transaction-pool", "serde_json", "tokio", + "tokio-stream", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c0d952fbb..f8a89cf91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -299,6 +299,7 @@ reth-eth-wire = { path = "crates/net/eth-wire" } reth-eth-wire-types = { path = "crates/net/eth-wire-types" } reth-ethereum-cli = { path = "crates/ethereum/cli" } reth-ethereum-consensus = { path = "crates/ethereum/consensus" } +reth-ethereum-engine = { path = "crates/ethereum/engine" } reth-ethereum-engine-primitives = { path = "crates/ethereum/engine-primitives" } reth-ethereum-forks = { path = "crates/ethereum-forks" } reth-ethereum-payload-builder = { path = "crates/ethereum/payload" } diff --git a/crates/ethereum/engine/Cargo.toml b/crates/ethereum/engine/Cargo.toml index 05fbc4386..492eb16bb 100644 --- a/crates/ethereum/engine/Cargo.toml +++ b/crates/ethereum/engine/Cargo.toml @@ -27,5 +27,8 @@ pin-project.workspace = true tokio = { workspace = true, features = ["sync"] } tokio-stream.workspace = true +# misc +thiserror.workspace = true + [dev-dependencies] reth-engine-tree = { workspace = true, features = ["test-utils"] } diff --git a/crates/ethereum/engine/src/service.rs b/crates/ethereum/engine/src/service.rs index 0abf352ee..bb7e8b06b 100644 --- a/crates/ethereum/engine/src/service.rs +++ b/crates/ethereum/engine/src/service.rs @@ -90,7 +90,8 @@ where } /// Potential error returned by `EthService`. -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] +#[error("Eth service error.")] pub struct EthServiceError {} #[cfg(test)] diff --git a/crates/ethereum/node/Cargo.toml b/crates/ethereum/node/Cargo.toml index e333c7245..2cce8650d 100644 --- a/crates/ethereum/node/Cargo.toml +++ b/crates/ethereum/node/Cargo.toml @@ -14,6 +14,7 @@ workspace = true # reth reth-payload-builder.workspace = true reth-ethereum-engine-primitives.workspace = true +reth-ethereum-engine.workspace = true reth-basic-payload-builder.workspace = true reth-ethereum-payload-builder.workspace = true reth-node-builder.workspace = true @@ -26,10 +27,20 @@ reth-consensus.workspace = true reth-auto-seal-consensus.workspace = true reth-beacon-consensus.workspace = true reth-rpc.workspace = true +reth-rpc-types.workspace = true +reth-rpc-engine-api.workspace = true reth-node-api.workspace = true +reth-tasks.workspace = true +reth-tokio-util.workspace = true +reth-node-events.workspace = true +reth-node-core.workspace = true +reth-exex.workspace = true # misc eyre.workspace = true +tokio = { workspace = true , features = ["sync"]} +tokio-stream.workspace = true +futures.workspace = true [dev-dependencies] reth.workspace = true @@ -41,7 +52,6 @@ reth-node-core.workspace = true reth-e2e-test-utils.workspace = true alloy-primitives.workspace = true alloy-genesis.workspace = true -futures.workspace = true tokio.workspace = true futures-util.workspace = true serde_json.workspace = true diff --git a/crates/ethereum/node/src/launch.rs b/crates/ethereum/node/src/launch.rs new file mode 100644 index 000000000..eb699cea2 --- /dev/null +++ b/crates/ethereum/node/src/launch.rs @@ -0,0 +1,276 @@ +//! Launch the Ethereum node. + +use futures::{future::Either, stream, stream_select, StreamExt}; +use reth_beacon_consensus::{ + hooks::{EngineHooks, PruneHook, StaticFileHook}, + BeaconConsensusEngineHandle, +}; +use reth_ethereum_engine::service::EthService; +use reth_ethereum_engine_primitives::EthEngineTypes; +use reth_exex::ExExManagerHandle; +use reth_network::NetworkEvents; +use reth_node_api::{FullNodeTypes, NodeAddOns}; +use reth_node_builder::{ + hooks::NodeHooks, + rpc::{launch_rpc_servers, EthApiBuilderProvider}, + AddOns, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter, + NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter, +}; +use reth_node_core::{ + dirs::{ChainPath, DataDirPath}, + exit::NodeExitFuture, + rpc::eth::{helpers::AddDevSigners, FullEthApiServer}, + version::{CARGO_PKG_VERSION, CLIENT_CODE, NAME_CLIENT, VERGEN_GIT_SHA}, +}; +use reth_node_events::{cl::ConsensusLayerHealthEvents, node}; +use reth_provider::providers::BlockchainProvider; +use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi}; +use reth_rpc_types::engine::ClientVersionV1; +use reth_tasks::TaskExecutor; +use reth_tokio_util::EventSender; +use reth_tracing::tracing::{debug, info}; +use std::sync::mpsc::channel; +use tokio::sync::{mpsc::unbounded_channel, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// The Ethereum node launcher. +#[derive(Debug)] +pub struct EthNodeLauncher { + /// The task executor for the node. + pub ctx: LaunchContext, +} + +impl EthNodeLauncher { + /// Create a new instance of the ethereum node launcher. + pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath) -> Self { + Self { ctx: LaunchContext::new(task_executor, data_dir) } + } +} + +impl LaunchNode> for EthNodeLauncher +where + T: FullNodeTypes< + Provider = BlockchainProvider<::DB>, + Engine = EthEngineTypes, + >, + CB: NodeComponentsBuilder, + AO: NodeAddOns>, + AO::EthApi: + EthApiBuilderProvider> + FullEthApiServer + AddDevSigners, +{ + type Node = NodeHandle, AO>; + + async fn launch_node( + self, + target: NodeBuilderWithComponents, + ) -> eyre::Result { + let Self { ctx } = self; + let NodeBuilderWithComponents { + adapter: NodeTypesAdapter { database }, + components_builder, + add_ons: AddOns { hooks, rpc, exexs: installed_exex }, + config, + } = target; + let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; + + // setup the launch context + let ctx = ctx + .with_configured_globals() + // load the toml config + .with_loaded_toml_config(config)? + // add resolved peers + .with_resolved_peers().await? + // attach the database + .attach(database.clone()) + // ensure certain settings take effect + .with_adjusted_configs() + // Create the provider factory + .with_provider_factory().await? + .inspect(|_| { + info!(target: "reth::cli", "Database opened"); + }) + .with_prometheus().await? + .inspect(|this| { + debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis"); + }) + .with_genesis()? + .inspect(|this| { + info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks()); + }) + .with_metrics() + // passing FullNodeTypes as type parameter here so that we can build + // later the components. + .with_blockchain_db::()? + .with_components(components_builder, on_component_initialized).await?; + + // spawn exexs + let exex_manager_handle = ExExLauncher::new( + ctx.head(), + ctx.node_adapter().clone(), + installed_exex, + ctx.configs().clone(), + ) + .launch() + .await; + + // create pipeline + let network_client = ctx.components().network().fetch_client().await?; + let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + + let max_block = ctx.max_block(network_client.clone()).await?; + let mut hooks = EngineHooks::new(); + + let static_file_producer = ctx.static_file_producer(); + let static_file_producer_events = static_file_producer.lock().events(); + hooks.add(StaticFileHook::new( + static_file_producer.clone(), + Box::new(ctx.task_executor().clone()), + )); + info!(target: "reth::cli", "StaticFileProducer initialized"); + + // Configure the pipeline + let pipeline_exex_handle = + exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty); + let pipeline = reth_node_builder::setup::build_networked_pipeline( + &ctx.toml_config().stages, + network_client.clone(), + ctx.consensus(), + ctx.provider_factory().clone(), + ctx.task_executor(), + ctx.sync_metrics_tx(), + ctx.prune_config(), + max_block, + static_file_producer, + ctx.components().block_executor().clone(), + pipeline_exex_handle, + )?; + + let pipeline_events = pipeline.events(); + + // TODO: support --debug.tip + let _initial_target = ctx.node_config().debug.tip; + + let mut pruner_builder = ctx.pruner_builder(); + if let Some(exex_manager_handle) = &exex_manager_handle { + pruner_builder = + pruner_builder.finished_exex_height(exex_manager_handle.finished_height()); + } + let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone()); + + let pruner_events = pruner.events(); + info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); + hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor().clone()))); + + let (to_tree_tx, _to_tree_rx) = channel(); + let (_from_tree_tx, from_tree_rx) = unbounded_channel(); + + // Configure the consensus engine + let eth_service = EthService::new( + ctx.chain_spec(), + network_client.clone(), + // to tree + to_tree_tx, + // from tree + from_tree_rx, + UnboundedReceiverStream::new(consensus_engine_rx), + pipeline, + Box::new(ctx.task_executor().clone()), + ); + + let event_sender = EventSender::default(); + + let beacon_engine_handle = + BeaconConsensusEngineHandle::new(consensus_engine_tx, event_sender); + + info!(target: "reth::cli", "Consensus engine initialized"); + + let events = stream_select!( + ctx.components().network().event_listener().map(Into::into), + // TODO get engine events + pipeline_events.map(Into::into), + if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { + Either::Left( + ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) + .map(Into::into), + ) + } else { + Either::Right(stream::empty()) + }, + pruner_events.map(Into::into), + static_file_producer_events.map(Into::into), + ); + ctx.task_executor().spawn_critical( + "events task", + node::handle_events( + Some(Box::new(ctx.components().network().clone())), + Some(ctx.head().number), + events, + database.clone(), + ), + ); + + let client = ClientVersionV1 { + code: CLIENT_CODE, + name: NAME_CLIENT.to_string(), + version: CARGO_PKG_VERSION.to_string(), + commit: VERGEN_GIT_SHA.to_string(), + }; + let engine_api = EngineApi::new( + ctx.blockchain_db().clone(), + ctx.chain_spec(), + beacon_engine_handle, + ctx.components().payload_builder().clone().into(), + Box::new(ctx.task_executor().clone()), + client, + EngineCapabilities::default(), + ); + info!(target: "reth::cli", "Engine API handler initialized"); + + // extract the jwt secret from the args if possible + let jwt_secret = ctx.auth_jwt_secret()?; + + // Start RPC servers + let (rpc_server_handles, rpc_registry) = launch_rpc_servers( + ctx.node_adapter().clone(), + engine_api, + ctx.node_config(), + jwt_secret, + rpc, + ) + .await?; + + // Run consensus engine to completion + let (tx, rx) = oneshot::channel(); + info!(target: "reth::cli", "Starting consensus engine"); + ctx.task_executor().spawn_critical_blocking("consensus engine", async move { + let res = eth_service.await; + let _ = tx.send(res); + }); + + let full_node = FullNode { + evm_config: ctx.components().evm_config().clone(), + block_executor: ctx.components().block_executor().clone(), + pool: ctx.components().pool().clone(), + network: ctx.components().network().clone(), + provider: ctx.node_adapter().provider.clone(), + payload_builder: ctx.components().payload_builder().clone(), + task_executor: ctx.task_executor().clone(), + rpc_server_handles, + rpc_registry, + config: ctx.node_config().clone(), + data_dir: ctx.data_dir().clone(), + }; + // Notify on node started + on_node_started.on_event(full_node.clone())?; + + let handle = NodeHandle { + node_exit_future: NodeExitFuture::new( + async { Ok(rx.await??) }, + full_node.config.debug.terminate, + ), + node: full_node, + }; + + Ok(handle) + } +} diff --git a/crates/ethereum/node/src/lib.rs b/crates/ethereum/node/src/lib.rs index 44ec6836c..e6ba86ead 100644 --- a/crates/ethereum/node/src/lib.rs +++ b/crates/ethereum/node/src/lib.rs @@ -15,3 +15,5 @@ pub use evm::{EthEvmConfig, EthExecutorProvider}; pub mod node; pub use node::EthereumNode; + +pub mod launch; diff --git a/crates/ethereum/node/tests/it/builder.rs b/crates/ethereum/node/tests/it/builder.rs index 8d97db46c..32ebf2d22 100644 --- a/crates/ethereum/node/tests/it/builder.rs +++ b/crates/ethereum/node/tests/it/builder.rs @@ -2,7 +2,11 @@ use reth_db::test_utils::create_test_rw_db; use reth_node_builder::{FullNodeComponents, NodeBuilder, NodeConfig}; -use reth_node_ethereum::node::{EthereumAddOns, EthereumNode}; +use reth_node_ethereum::{ + launch::EthNodeLauncher, + node::{EthereumAddOns, EthereumNode}, +}; +use reth_tasks::TaskManager; #[test] fn test_basic_setup() { @@ -34,6 +38,22 @@ fn test_basic_setup() { .check_launch(); } +#[tokio::test] +async fn test_eth_launcher() { + let tasks = TaskManager::current(); + let config = NodeConfig::test(); + let db = create_test_rw_db(); + let _builder = NodeBuilder::new(config) + .with_database(db) + .with_types::() + .with_components(EthereumNode::components()) + .with_add_ons::() + .launch_with_fn(|builder| { + let launcher = EthNodeLauncher::new(tasks.executor(), builder.config.datadir()); + builder.launch_with(launcher) + }); +} + #[test] fn test_node_setup() { let config = NodeConfig::test(); diff --git a/crates/node/builder/src/builder/states.rs b/crates/node/builder/src/builder/states.rs index 0a357cf05..33ffd7901 100644 --- a/crates/node/builder/src/builder/states.rs +++ b/crates/node/builder/src/builder/states.rs @@ -60,9 +60,9 @@ impl NodeBuilderWithTypes { } /// Container for the node's types and the database the node uses. -pub(crate) struct NodeTypesAdapter { +pub struct NodeTypesAdapter { /// The database type used by the node. - pub(crate) database: T::DB, + pub database: T::DB, } impl NodeTypesAdapter { @@ -152,13 +152,13 @@ pub struct NodeBuilderWithComponents< AO: NodeAddOns>, > { /// All settings for how the node should be configured. - pub(crate) config: NodeConfig, + pub config: NodeConfig, /// Adapter for the underlying node types and database - pub(crate) adapter: NodeTypesAdapter, + pub adapter: NodeTypesAdapter, /// container for type specific components - pub(crate) components_builder: CB, + pub components_builder: CB, /// Additional node extensions. - pub(crate) add_ons: AddOns, AO>, + pub add_ons: AddOns, AO>, } impl NodeBuilderWithComponents diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index f6269abde..31403d224 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -252,7 +252,7 @@ impl<'a, Node: FullNodeComponents, EthApi> RpcContext<'a, Node, EthApi> { } /// Launch the rpc servers. -pub(crate) async fn launch_rpc_servers( +pub async fn launch_rpc_servers( node: Node, engine_api: Engine, config: &NodeConfig,