From 523bfb9c81ad7c2493882d83d0c9a6d0bcb2ce52 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Mon, 14 Oct 2024 17:21:41 +0400 Subject: [PATCH] feat: refactor and integrate local engine into `EngineNodeLauncher` (#11703) --- Cargo.lock | 17 +- .../consensus/beacon/src/engine/forkchoice.rs | 9 +- crates/e2e-test-utils/Cargo.toml | 1 + crates/engine/local/Cargo.toml | 24 +- crates/engine/local/src/lib.rs | 13 + crates/engine/local/src/miner.rs | 193 +++++++- crates/engine/local/src/payload.rs | 69 ++- crates/engine/local/src/service.rs | 429 +++++------------- crates/engine/service/src/service.rs | 2 +- crates/ethereum/node/tests/e2e/dev.rs | 53 ++- crates/ethereum/node/tests/e2e/utils.rs | 5 - crates/node/builder/Cargo.toml | 1 + crates/node/builder/src/launch/engine.rs | 79 +++- crates/node/events/src/node.rs | 2 +- crates/optimism/node/Cargo.toml | 2 + crates/payload/primitives/src/traits.rs | 9 +- 16 files changed, 504 insertions(+), 404 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c5639ead..d690cf536 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6971,6 +6971,7 @@ dependencies = [ "reth", "reth-chainspec", "reth-db", + "reth-engine-local", "reth-network-peers", "reth-node-builder", "reth-node-ethereum", @@ -7028,22 +7029,22 @@ dependencies = [ "alloy-rpc-types-engine", "eyre", "futures-util", + "op-alloy-rpc-types-engine", "reth-beacon-consensus", - "reth-chain-state", "reth-chainspec", - "reth-config", - "reth-db", + "reth-consensus", + "reth-engine-primitives", + "reth-engine-service", "reth-engine-tree", "reth-ethereum-engine-primitives", - "reth-exex-test-utils", - "reth-node-types", + "reth-evm", "reth-payload-builder", "reth-payload-primitives", - "reth-primitives", + "reth-payload-validator", "reth-provider", "reth-prune", + "reth-rpc-types-compat", "reth-stages-api", - "reth-tracing", "reth-transaction-pool", "tokio", "tokio-stream", @@ -7809,6 +7810,7 @@ dependencies = [ "reth-db-api", "reth-db-common", "reth-downloaders", + "reth-engine-local", "reth-engine-service", "reth-engine-tree", "reth-engine-util", @@ -8131,6 +8133,7 @@ dependencies = [ "reth-db", "reth-discv5", "reth-e2e-test-utils", + "reth-engine-local", "reth-evm", "reth-network", "reth-node-api", diff --git a/crates/consensus/beacon/src/engine/forkchoice.rs b/crates/consensus/beacon/src/engine/forkchoice.rs index 975c2ee3b..7e49714ba 100644 --- a/crates/consensus/beacon/src/engine/forkchoice.rs +++ b/crates/consensus/beacon/src/engine/forkchoice.rs @@ -150,15 +150,18 @@ pub enum ForkchoiceStatus { } impl ForkchoiceStatus { - pub(crate) const fn is_valid(&self) -> bool { + /// Returns `true` if the forkchoice state is [`ForkchoiceStatus::Valid`]. + pub const fn is_valid(&self) -> bool { matches!(self, Self::Valid) } - pub(crate) const fn is_invalid(&self) -> bool { + /// Returns `true` if the forkchoice state is [`ForkchoiceStatus::Invalid`]. + pub const fn is_invalid(&self) -> bool { matches!(self, Self::Invalid) } - pub(crate) const fn is_syncing(&self) -> bool { + /// Returns `true` if the forkchoice state is [`ForkchoiceStatus::Syncing`]. + pub const fn is_syncing(&self) -> bool { matches!(self, Self::Syncing) } diff --git a/crates/e2e-test-utils/Cargo.toml b/crates/e2e-test-utils/Cargo.toml index a10162e78..2742d7040 100644 --- a/crates/e2e-test-utils/Cargo.toml +++ b/crates/e2e-test-utils/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] reth.workspace = true reth-chainspec.workspace = true +reth-engine-local.workspace = true reth-primitives.workspace = true reth-tracing.workspace = true reth-db = { workspace = true, features = ["test-utils"] } diff --git a/crates/engine/local/Cargo.toml b/crates/engine/local/Cargo.toml index d7a5d0509..f22ab1f8d 100644 --- a/crates/engine/local/Cargo.toml +++ b/crates/engine/local/Cargo.toml @@ -11,15 +11,19 @@ exclude.workspace = true [dependencies] # reth reth-beacon-consensus.workspace = true -reth-chain-state.workspace = true +reth-chainspec.workspace = true +reth-consensus.workspace = true +reth-engine-primitives.workspace = true +reth-engine-service.workspace = true reth-engine-tree.workspace = true +reth-evm.workspace = true reth-ethereum-engine-primitives.workspace = true -reth-node-types.workspace = true reth-payload-builder.workspace = true reth-payload-primitives.workspace = true -reth-primitives.workspace = true +reth-payload-validator.workspace = true reth-provider.workspace = true reth-prune.workspace = true +reth-rpc-types-compat.workspace = true reth-transaction-pool.workspace = true reth-stages-api.workspace = true @@ -36,16 +40,10 @@ futures-util.workspace = true eyre.workspace = true tracing.workspace = true -[dev-dependencies] -reth-chainspec.workspace = true -reth-chain-state.workspace = true -reth-config.workspace = true -reth-db = { workspace = true, features = ["test-utils"] } -reth-ethereum-engine-primitives.workspace = true -reth-exex-test-utils.workspace = true -reth-payload-builder = { workspace = true, features = ["test-utils"] } -reth-provider = { workspace = true, features = ["test-utils"] } -reth-tracing.workspace = true +op-alloy-rpc-types-engine = { workspace = true, optional = true } [lints] workspace = true + +[features] +optimism = ["op-alloy-rpc-types-engine"] diff --git a/crates/engine/local/src/lib.rs b/crates/engine/local/src/lib.rs index 1b84c8a11..26c84d50c 100644 --- a/crates/engine/local/src/lib.rs +++ b/crates/engine/local/src/lib.rs @@ -1,4 +1,17 @@ //! A local engine service that can be used to drive a dev chain. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + pub mod miner; pub mod payload; pub mod service; + +pub use miner::MiningMode; +pub use payload::LocalPayloadAttributesBuilder; +pub use service::LocalEngineService; diff --git a/crates/engine/local/src/miner.rs b/crates/engine/local/src/miner.rs index de3d8cb8d..e12a2a50d 100644 --- a/crates/engine/local/src/miner.rs +++ b/crates/engine/local/src/miner.rs @@ -1,16 +1,31 @@ //! Contains the implementation of the mining mode for the local engine. -use alloy_primitives::TxHash; +use alloy_primitives::{TxHash, B256}; +use alloy_rpc_types_engine::{CancunPayloadFields, ForkchoiceState}; +use eyre::OptionExt; use futures_util::{stream::Fuse, StreamExt}; +use reth_beacon_consensus::BeaconEngineMessage; +use reth_chainspec::EthereumHardforks; +use reth_engine_primitives::EngineTypes; +use reth_payload_builder::PayloadBuilderHandle; +use reth_payload_primitives::{ + BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadTypes, +}; +use reth_provider::{BlockReader, ChainSpecProvider}; +use reth_rpc_types_compat::engine::payload::block_to_payload; use reth_transaction_pool::TransactionPool; use std::{ future::Future, pin::Pin, task::{Context, Poll}, - time::Duration, + time::{Duration, UNIX_EPOCH}, +}; +use tokio::{ + sync::{mpsc::UnboundedSender, oneshot}, + time::Interval, }; -use tokio::time::Interval; use tokio_stream::wrappers::ReceiverStream; +use tracing::error; /// A mining mode for the local dev engine. #[derive(Debug)] @@ -58,3 +73,175 @@ impl Future for MiningMode { } } } + +/// Local miner advancing the chain/ +#[derive(Debug)] +pub struct LocalMiner { + /// Provider to read the current tip of the chain. + provider: Provider, + /// The payload attribute builder for the engine + payload_attributes_builder: B, + /// Sender for events to engine. + to_engine: UnboundedSender>, + /// The mining mode for the engine + mode: MiningMode, + /// The payload builder for the engine + payload_builder: PayloadBuilderHandle, + /// Timestamp for the next block. + last_timestamp: u64, + /// Stores latest mined blocks. + last_block_hashes: Vec, +} + +impl LocalMiner +where + EngineT: EngineTypes, + Provider: BlockReader + ChainSpecProvider + 'static, + B: PayloadAttributesBuilder<::PayloadAttributes>, +{ + /// Spawns a new [`LocalMiner`] with the given parameters. + pub fn spawn_new( + provider: Provider, + payload_attributes_builder: B, + to_engine: UnboundedSender>, + mode: MiningMode, + payload_builder: PayloadBuilderHandle, + ) { + let latest_header = + provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap(); + + let miner = Self { + provider, + payload_attributes_builder, + to_engine, + mode, + payload_builder, + last_timestamp: latest_header.timestamp, + last_block_hashes: vec![latest_header.hash()], + }; + + // Spawn the miner + tokio::spawn(miner.run()); + } + + /// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads. + async fn run(mut self) { + let mut fcu_interval = tokio::time::interval(Duration::from_secs(1)); + loop { + tokio::select! { + // Wait for the interval or the pool to receive a transaction + _ = &mut self.mode => { + if let Err(e) = self.advance().await { + error!(target: "engine::local", "Error advancing the chain: {:?}", e); + } + } + // send FCU once in a while + _ = fcu_interval.tick() => { + if let Err(e) = self.update_forkchoice_state().await { + error!(target: "engine::local", "Error updating fork choice: {:?}", e); + } + } + } + } + } + + /// Returns current forkchoice state. + fn forkchoice_state(&self) -> ForkchoiceState { + ForkchoiceState { + head_block_hash: *self.last_block_hashes.last().expect("at least 1 block exists"), + safe_block_hash: *self + .last_block_hashes + .get(self.last_block_hashes.len().saturating_sub(32)) + .expect("at least 1 block exists"), + finalized_block_hash: *self + .last_block_hashes + .get(self.last_block_hashes.len().saturating_sub(64)) + .expect("at least 1 block exists"), + } + } + + /// Sends a FCU to the engine. + async fn update_forkchoice_state(&self) -> eyre::Result<()> { + let (tx, rx) = oneshot::channel(); + self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { + state: self.forkchoice_state(), + payload_attrs: None, + tx, + })?; + + let res = rx.await??; + if !res.forkchoice_status().is_valid() { + eyre::bail!("Invalid fork choice update") + } + + Ok(()) + } + + /// Generates payload attributes for a new block, passes them to FCU and inserts built payload + /// through newPayload. + async fn advance(&mut self) -> eyre::Result<()> { + let timestamp = std::cmp::max( + self.last_timestamp + 1, + std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("cannot be earlier than UNIX_EPOCH") + .as_secs(), + ); + + let (tx, rx) = oneshot::channel(); + self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { + state: self.forkchoice_state(), + payload_attrs: Some(self.payload_attributes_builder.build(timestamp)), + tx, + })?; + + let res = rx.await??.await?; + if !res.payload_status.is_valid() { + eyre::bail!("Invalid payload status") + } + + let payload_id = res.payload_id.ok_or_eyre("No payload id")?; + + // wait for some time to let the payload be built + tokio::time::sleep(Duration::from_millis(200)).await; + + let Some(Ok(payload)) = self.payload_builder.best_payload(payload_id).await else { + eyre::bail!("No payload") + }; + + let block = payload.block(); + + let cancun_fields = + if self.provider.chain_spec().is_cancun_active_at_timestamp(block.timestamp) { + Some(CancunPayloadFields { + parent_beacon_block_root: block.parent_beacon_block_root.unwrap(), + versioned_hashes: block.blob_versioned_hashes().into_iter().copied().collect(), + }) + } else { + None + }; + + let (tx, rx) = oneshot::channel(); + self.to_engine.send(BeaconEngineMessage::NewPayload { + payload: block_to_payload(payload.block().clone()), + cancun_fields, + tx, + })?; + + let res = rx.await??; + + if !res.is_valid() { + eyre::bail!("Invalid payload") + } + + self.last_timestamp = timestamp; + self.last_block_hashes.push(block.hash()); + // ensure we keep at most 64 blocks + if self.last_block_hashes.len() > 64 { + self.last_block_hashes = + self.last_block_hashes.split_off(self.last_block_hashes.len() - 64); + } + + Ok(()) + } +} diff --git a/crates/engine/local/src/payload.rs b/crates/engine/local/src/payload.rs index 4fd49f53f..15d5ff2cf 100644 --- a/crates/engine/local/src/payload.rs +++ b/crates/engine/local/src/payload.rs @@ -2,29 +2,62 @@ //! [`LocalEngineService`](super::service::LocalEngineService). use alloy_primitives::{Address, B256}; +use reth_chainspec::EthereumHardforks; use reth_ethereum_engine_primitives::EthPayloadAttributes; use reth_payload_primitives::PayloadAttributesBuilder; -use std::{convert::Infallible, time::UNIX_EPOCH}; +use std::sync::Arc; /// The attributes builder for local Ethereum payload. #[derive(Debug)] -pub struct EthLocalPayloadAttributesBuilder; +#[non_exhaustive] +pub struct LocalPayloadAttributesBuilder { + chain_spec: Arc, +} -impl PayloadAttributesBuilder for EthLocalPayloadAttributesBuilder { - type PayloadAttributes = EthPayloadAttributes; - type Error = Infallible; - - fn build(&self) -> Result { - let ts = std::time::SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("cannot be earlier than UNIX_EPOCH"); - - Ok(EthPayloadAttributes { - timestamp: ts.as_secs(), - prev_randao: B256::random(), - suggested_fee_recipient: Address::random(), - withdrawals: None, - parent_beacon_block_root: None, - }) +impl LocalPayloadAttributesBuilder { + /// Creates a new instance of the builder. + pub const fn new(chain_spec: Arc) -> Self { + Self { chain_spec } + } +} + +impl PayloadAttributesBuilder + for LocalPayloadAttributesBuilder +where + ChainSpec: Send + Sync + EthereumHardforks + 'static, +{ + fn build(&self, timestamp: u64) -> EthPayloadAttributes { + EthPayloadAttributes { + timestamp, + prev_randao: B256::random(), + suggested_fee_recipient: Address::random(), + withdrawals: if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) { + Some(Default::default()) + } else { + None + }, + parent_beacon_block_root: if self.chain_spec.is_cancun_active_at_timestamp(timestamp) { + Some(B256::random()) + } else { + None + }, + } + } +} + +#[cfg(feature = "optimism")] +impl PayloadAttributesBuilder + for LocalPayloadAttributesBuilder +where + ChainSpec: Send + Sync + EthereumHardforks + 'static, +{ + fn build(&self, timestamp: u64) -> op_alloy_rpc_types_engine::OpPayloadAttributes { + op_alloy_rpc_types_engine::OpPayloadAttributes { + payload_attributes: self.build(timestamp), + transactions: None, + no_tx_pool: None, + gas_limit: None, + eip_1559_params: None, + } } } diff --git a/crates/engine/local/src/service.rs b/crates/engine/local/src/service.rs index c9794ecfa..93a9cf11e 100644 --- a/crates/engine/local/src/service.rs +++ b/crates/engine/local/src/service.rs @@ -6,357 +6,154 @@ //! with a single transaction. The `Interval` mode will initiate block //! building at a fixed interval. -use crate::miner::MiningMode; -use eyre::eyre; -use reth_beacon_consensus::EngineNodeTypes; -use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain}; -use reth_engine_tree::persistence::PersistenceHandle; -use reth_payload_builder::PayloadBuilderHandle; -use reth_payload_primitives::{ - BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadBuilderAttributes, PayloadTypes, +use core::fmt; +use std::{ + fmt::{Debug, Formatter}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, }; -use reth_provider::ProviderFactory; + +use crate::miner::{LocalMiner, MiningMode}; +use futures_util::{Stream, StreamExt}; +use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineNodeTypes}; +use reth_chainspec::EthChainSpec; +use reth_consensus::Consensus; +use reth_engine_service::service::EngineMessageStream; +use reth_engine_tree::{ + chain::{ChainEvent, HandlerEvent}, + engine::{ + EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine, + RequestHandlerEvent, + }, + persistence::PersistenceHandle, + tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig}, +}; +use reth_evm::execute::BlockExecutorProvider; +use reth_payload_builder::PayloadBuilderHandle; +use reth_payload_primitives::{PayloadAttributesBuilder, PayloadTypes}; +use reth_payload_validator::ExecutionPayloadValidator; +use reth_provider::{providers::BlockchainProvider2, ChainSpecProvider, ProviderFactory}; use reth_prune::PrunerWithFactory; use reth_stages_api::MetricEventsSender; -use tokio::sync::oneshot; -use tracing::debug; +use tokio::sync::mpsc::UnboundedSender; +use tracing::error; /// Provides a local dev service engine that can be used to drive the /// chain forward. -#[derive(Debug)] -pub struct LocalEngineService +/// +/// This service both produces and consumes [`BeaconEngineMessage`]s. This is done to allow +/// modifications of the stream +pub struct LocalEngineService where N: EngineNodeTypes, - B: PayloadAttributesBuilder::PayloadAttributes>, { - /// The payload builder for the engine - payload_builder: PayloadBuilderHandle, - /// The payload attribute builder for the engine - payload_attributes_builder: B, - /// Keep track of the Canonical chain state that isn't persisted on disk yet - canonical_in_memory_state: CanonicalInMemoryState, - /// A handle to the persistence layer - persistence_handle: PersistenceHandle, - /// The mining mode for the engine - mode: MiningMode, + /// Processes requests. + /// + /// This type is responsible for processing incoming requests. + handler: EngineApiRequestHandler>, + /// Receiver for incoming requests (from the engine API endpoint) that need to be processed. + incoming_requests: EngineMessageStream, } -impl LocalEngineService +impl LocalEngineService where N: EngineNodeTypes, - B: PayloadAttributesBuilder::PayloadAttributes>, { /// Constructor for [`LocalEngineService`]. - pub fn new( - payload_builder: PayloadBuilderHandle, - payload_attributes_builder: B, + #[allow(clippy::too_many_arguments)] + pub fn new( + consensus: Arc, + executor_factory: impl BlockExecutorProvider, provider: ProviderFactory, + blockchain_db: BlockchainProvider2, pruner: PrunerWithFactory>, - canonical_in_memory_state: CanonicalInMemoryState, + payload_builder: PayloadBuilderHandle, + tree_config: TreeConfig, + invalid_block_hook: Box, sync_metrics_tx: MetricEventsSender, + to_engine: UnboundedSender>, + from_engine: EngineMessageStream, mode: MiningMode, - ) -> Self { + payload_attributes_builder: B, + ) -> Self + where + B: PayloadAttributesBuilder<::PayloadAttributes>, + { + let chain_spec = provider.chain_spec(); + let engine_kind = + if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum }; + let persistence_handle = PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx); + let payload_validator = ExecutionPayloadValidator::new(chain_spec); - Self { - payload_builder, - payload_attributes_builder, - canonical_in_memory_state, + let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); + + let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new( + blockchain_db.clone(), + executor_factory, + consensus, + payload_validator, persistence_handle, - mode, - } - } + payload_builder.clone(), + canonical_in_memory_state, + tree_config, + invalid_block_hook, + engine_kind, + ); - /// Spawn the [`LocalEngineService`] on a tokio green thread. The service will poll the payload - /// builder with two varying modes, [`MiningMode::Instant`] or [`MiningMode::Interval`] - /// which will respectively either execute the block as soon as it finds a - /// transaction in the pool or build the block based on an interval. - pub fn spawn_new( - payload_builder: PayloadBuilderHandle, - payload_attributes_builder: B, - provider: ProviderFactory, - pruner: PrunerWithFactory>, - canonical_in_memory_state: CanonicalInMemoryState, - sync_metrics_tx: MetricEventsSender, - mode: MiningMode, - ) { - let engine = Self::new( - payload_builder, + let handler = EngineApiRequestHandler::new(to_tree_tx, from_tree); + + LocalMiner::spawn_new( + blockchain_db, payload_attributes_builder, - provider, - pruner, - canonical_in_memory_state, - sync_metrics_tx, + to_engine, mode, + payload_builder, ); - // Spawn the engine - tokio::spawn(engine.run()); - } - - /// Runs the [`LocalEngineService`] in a loop, polling the miner and building - /// payloads. - async fn run(mut self) { - loop { - // Wait for the interval or the pool to receive a transaction - (&mut self.mode).await; - - // Start a new payload building job - let executed_block = self.build_and_save_payload().await; - - if executed_block.is_err() { - debug!(target: "local_engine", err = ?executed_block.unwrap_err(), "failed payload building"); - continue - } - let block = executed_block.expect("not error"); - - let res = self.update_canonical_in_memory_state(block); - if res.is_err() { - debug!(target: "local_engine", err = ?res.unwrap_err(), "failed canonical state update"); - } - } - } - - /// Builds a payload by initiating a new payload job via the [`PayloadBuilderHandle`], - /// saving the execution outcome to persistence and returning the executed block. - async fn build_and_save_payload(&self) -> eyre::Result { - let payload_attributes = self.payload_attributes_builder.build()?; - let parent = self.canonical_in_memory_state.get_canonical_head().hash(); - let payload_builder_attributes = - ::PayloadBuilderAttributes::try_new( - parent, - payload_attributes, - ) - .map_err(|_| eyre::eyre!("failed to fetch payload attributes"))?; - - let payload = self - .payload_builder - .send_and_resolve_payload(payload_builder_attributes) - .await? - .await?; - - let executed_block = - payload.executed_block().ok_or_else(|| eyre!("missing executed block"))?; - let (tx, rx) = oneshot::channel(); - - let _ = self.persistence_handle.save_blocks(vec![executed_block.clone()], tx); - - // Wait for the persistence_handle to complete - let _ = rx.await?.ok_or_else(|| eyre!("missing new head"))?; - - Ok(executed_block) - } - - /// Update the canonical in memory state and send notification for a new canon state to - /// all the listeners. - fn update_canonical_in_memory_state(&self, executed_block: ExecutedBlock) -> eyre::Result<()> { - let chain = NewCanonicalChain::Commit { new: vec![executed_block] }; - let tip = chain.tip().header.clone(); - let notification = chain.to_chain_notification(); - - // Update the tracked in-memory state with the new chain - self.canonical_in_memory_state.update_chain(chain); - self.canonical_in_memory_state.set_canonical_head(tip); - - // Sends an event to all active listeners about the new canonical chain - self.canonical_in_memory_state.notify_canon_state(notification); - Ok(()) + Self { handler, incoming_requests: from_engine } } } -#[cfg(test)] -mod tests { - use super::*; - use reth_chainspec::MAINNET; - use reth_config::PruneConfig; - use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; - use reth_ethereum_engine_primitives::EthEngineTypes; - use reth_exex_test_utils::TestNode; - use reth_node_types::NodeTypesWithDBAdapter; - use reth_payload_builder::test_utils::spawn_test_payload_service; - use reth_provider::{providers::StaticFileProvider, BlockReader, ProviderFactory}; - use reth_prune::PrunerBuilder; - use reth_transaction_pool::{ - test_utils::{testing_pool, MockTransaction}, - TransactionPool, - }; - use std::{convert::Infallible, time::Duration}; - use tokio::sync::mpsc::unbounded_channel; +impl Stream for LocalEngineService +where + N: EngineNodeTypes, +{ + type Item = ChainEvent; - #[derive(Debug)] - struct TestPayloadAttributesBuilder; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); - impl PayloadAttributesBuilder for TestPayloadAttributesBuilder { - type PayloadAttributes = alloy_rpc_types_engine::PayloadAttributes; - type Error = Infallible; - - fn build(&self) -> Result { - Ok(alloy_rpc_types_engine::PayloadAttributes { - timestamp: 0, - prev_randao: Default::default(), - suggested_fee_recipient: Default::default(), - withdrawals: None, - parent_beacon_block_root: None, - }) + if let Poll::Ready(ev) = this.handler.poll(cx) { + return match ev { + RequestHandlerEvent::HandlerEvent(ev) => match ev { + HandlerEvent::BackfillAction(_) => { + error!(target: "engine::local", "received backfill request in local engine"); + Poll::Ready(Some(ChainEvent::FatalError)) + } + HandlerEvent::Event(ev) => Poll::Ready(Some(ChainEvent::Handler(ev))), + HandlerEvent::FatalError => Poll::Ready(Some(ChainEvent::FatalError)), + }, + RequestHandlerEvent::Download(_) => { + error!(target: "engine::local", "received download request in local engine"); + Poll::Ready(Some(ChainEvent::FatalError)) + } + } } - } - #[tokio::test] - async fn test_local_engine_service_interval() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); + // forward incoming requests to the handler + while let Poll::Ready(Some(req)) = this.incoming_requests.poll_next_unpin(cx) { + this.handler.on_event(FromEngine::Request(req.into())); + } - // Start the provider and the pruner - let (_, static_dir_path) = create_test_static_files_dir(); - let provider = ProviderFactory::>::new( - create_test_rw_db(), - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path)?, - ); - let pruner = PrunerBuilder::new(PruneConfig::default()) - .build_with_provider_factory(provider.clone()); - - // Create an empty canonical in memory state - let canonical_in_memory_state = CanonicalInMemoryState::empty(); - - // Start the payload builder service - let payload_handle = spawn_test_payload_service::(); - - // Sync metric channel - let (sync_metrics_tx, _) = unbounded_channel(); - - // Launch the LocalEngineService in interval mode - let period = Duration::from_secs(1); - LocalEngineService::spawn_new( - payload_handle, - TestPayloadAttributesBuilder, - provider.clone(), - pruner, - canonical_in_memory_state, - sync_metrics_tx, - MiningMode::interval(period), - ); - - // Check that we have no block for now - let block = provider.block_by_number(0)?; - assert!(block.is_none()); - - // Wait 4 intervals - tokio::time::sleep(2 * period).await; - - // Assert a block has been build - let block = provider.block_by_number(0)?; - assert!(block.is_some()); - - Ok(()) - } - - #[tokio::test] - async fn test_local_engine_service_instant() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); - - // Start the provider and the pruner - let (_, static_dir_path) = create_test_static_files_dir(); - let provider = ProviderFactory::>::new( - create_test_rw_db(), - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path)?, - ); - let pruner = PrunerBuilder::new(PruneConfig::default()) - .build_with_provider_factory(provider.clone()); - - // Create an empty canonical in memory state - let canonical_in_memory_state = CanonicalInMemoryState::empty(); - - // Start the payload builder service - let payload_handle = spawn_test_payload_service::(); - - // Start a transaction pool - let pool = testing_pool(); - - // Sync metric channel - let (sync_metrics_tx, _) = unbounded_channel(); - - // Launch the LocalEngineService in instant mode - LocalEngineService::spawn_new( - payload_handle, - TestPayloadAttributesBuilder, - provider.clone(), - pruner, - canonical_in_memory_state, - sync_metrics_tx, - MiningMode::instant(pool.clone()), - ); - - // Wait for a small period to assert block building is - // triggered by adding a transaction to the pool - let period = Duration::from_millis(500); - tokio::time::sleep(period).await; - let block = provider.block_by_number(0)?; - assert!(block.is_none()); - - // Add a transaction to the pool - let transaction = MockTransaction::legacy().with_gas_price(10); - pool.add_transaction(Default::default(), transaction).await?; - - // Wait for block building - let period = Duration::from_secs(2); - tokio::time::sleep(period).await; - - // Assert a block has been build - let block = provider.block_by_number(0)?; - assert!(block.is_some()); - - Ok(()) - } - - #[tokio::test] - async fn test_canonical_chain_subscription() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); - - // Start the provider and the pruner - let (_, static_dir_path) = create_test_static_files_dir(); - let provider = ProviderFactory::>::new( - create_test_rw_db(), - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path)?, - ); - let pruner = PrunerBuilder::new(PruneConfig::default()) - .build_with_provider_factory(provider.clone()); - - // Create an empty canonical in memory state - let canonical_in_memory_state = CanonicalInMemoryState::empty(); - let mut notifications = canonical_in_memory_state.subscribe_canon_state(); - - // Start the payload builder service - let payload_handle = spawn_test_payload_service::(); - - // Start a transaction pool - let pool = testing_pool(); - - // Sync metric channel - let (sync_metrics_tx, _) = unbounded_channel(); - - // Launch the LocalEngineService in instant mode - LocalEngineService::spawn_new( - payload_handle, - TestPayloadAttributesBuilder, - provider.clone(), - pruner, - canonical_in_memory_state, - sync_metrics_tx, - MiningMode::instant(pool.clone()), - ); - - // Add a transaction to the pool - let transaction = MockTransaction::legacy().with_gas_price(10); - pool.add_transaction(Default::default(), transaction).await?; - - // Check a notification is received for block 0 - let res = notifications.recv().await?; - - assert_eq!(res.tip().number, 0); - - Ok(()) + Poll::Pending + } +} + +impl Debug for LocalEngineService { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("LocalEngineService").finish_non_exhaustive() } } diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index ed9c1aa1c..026476a82 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -31,7 +31,7 @@ use std::{ }; /// Alias for consensus engine stream. -type EngineMessageStream = Pin> + Send + Sync>>; +pub type EngineMessageStream = Pin> + Send + Sync>>; /// Alias for chain orchestrator. type EngineServiceType = ChainOrchestrator< diff --git a/crates/ethereum/node/tests/e2e/dev.rs b/crates/ethereum/node/tests/e2e/dev.rs index 2ef6e08c7..6b4733b6f 100644 --- a/crates/ethereum/node/tests/e2e/dev.rs +++ b/crates/ethereum/node/tests/e2e/dev.rs @@ -3,29 +3,64 @@ use std::sync::Arc; use alloy_genesis::Genesis; use alloy_primitives::{b256, hex}; use futures::StreamExt; -use reth::core::rpc::eth::helpers::EthTransactions; +use reth::{args::DevArgs, core::rpc::eth::helpers::EthTransactions}; use reth_chainspec::ChainSpec; use reth_e2e_test_utils::setup; -use reth_provider::CanonStateSubscriptions; - -use crate::utils::EthNode; +use reth_node_api::{FullNodeComponents, NodeAddOns}; +use reth_node_builder::{EngineNodeLauncher, FullNode, NodeBuilder, NodeConfig, NodeHandle}; +use reth_node_ethereum::{node::EthereumAddOns, EthereumNode}; +use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions}; +use reth_tasks::TaskManager; #[tokio::test] async fn can_run_dev_node() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let (mut nodes, _tasks, _) = setup(1, custom_chain(), true).await?; + let (mut nodes, _tasks, _) = setup::(1, custom_chain(), true).await?; - assert_chain_advances(nodes.pop().unwrap()).await; + assert_chain_advances(nodes.pop().unwrap().inner).await; Ok(()) } -async fn assert_chain_advances(node: EthNode) { - let mut notifications = node.inner.provider.canonical_state_stream(); +#[tokio::test] +async fn can_run_dev_node_new_engine() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let tasks = TaskManager::current(); + let exec = tasks.executor(); + + let node_config = NodeConfig::test() + .with_chain(custom_chain()) + .with_dev(DevArgs { dev: true, ..Default::default() }); + let NodeHandle { node, .. } = NodeBuilder::new(node_config.clone()) + .testing_node(exec.clone()) + .with_types_and_provider::>() + .with_components(EthereumNode::components()) + .with_add_ons(EthereumAddOns::default()) + .launch_with_fn(|builder| { + let launcher = EngineNodeLauncher::new( + builder.task_executor().clone(), + builder.config().datadir(), + Default::default(), + ); + builder.launch_with(launcher) + }) + .await?; + + assert_chain_advances(node).await; + + Ok(()) +} + +async fn assert_chain_advances(node: FullNode) +where + N: FullNodeComponents, + AddOns: NodeAddOns, +{ + let mut notifications = node.provider.canonical_state_stream(); // submit tx through rpc let raw_tx = hex!("02f876820a28808477359400847735940082520894ab0840c0e43688012c1adb0f5e3fc665188f83d28a029d394a5d630544000080c080a0a044076b7e67b5deecc63f61a8d7913fab86ca365b344b5759d1fe3563b4c39ea019eab979dd000da04dfc72bb0377c092d30fd9e1cab5ae487de49586cc8b0090"); - let eth_api = node.inner.rpc_registry.eth_api(); + let eth_api = node.rpc_registry.eth_api(); let hash = eth_api.send_raw_transaction(raw_tx.into()).await.unwrap(); diff --git a/crates/ethereum/node/tests/e2e/utils.rs b/crates/ethereum/node/tests/e2e/utils.rs index 5a7950999..6e534f5dc 100644 --- a/crates/ethereum/node/tests/e2e/utils.rs +++ b/crates/ethereum/node/tests/e2e/utils.rs @@ -1,12 +1,7 @@ use alloy_primitives::{Address, B256}; use reth::rpc::types::engine::PayloadAttributes; -use reth_e2e_test_utils::NodeHelperType; -use reth_node_ethereum::{node::EthereumAddOns, EthereumNode}; use reth_payload_builder::EthPayloadBuilderAttributes; -/// Ethereum Node Helper type -pub(crate) type EthNode = NodeHelperType; - /// Helper function to create a new eth payload attributes pub(crate) fn eth_payload_attributes(timestamp: u64) -> EthPayloadBuilderAttributes { let attributes = PayloadAttributes { diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index 1bf2ba233..53e53cd2b 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -26,6 +26,7 @@ reth-db = { workspace = true, features = ["mdbx"], optional = true } reth-db-api.workspace = true reth-db-common.workspace = true reth-downloaders.workspace = true +reth-engine-local.workspace = true reth-engine-service.workspace = true reth-engine-tree.workspace = true reth-engine-util.workspace = true diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 782cc7bbb..f9e26f202 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -9,6 +9,7 @@ use reth_beacon_consensus::{ use reth_blockchain_tree::BlockchainTreeConfig; use reth_chainspec::EthChainSpec; use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider}; +use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder, MiningMode}; use reth_engine_service::service::{ChainEvent, EngineService}; use reth_engine_tree::{ engine::{EngineApiRequest, EngineRequestHandler}, @@ -18,7 +19,10 @@ use reth_engine_util::EngineMessageStreamExt; use reth_exex::ExExManagerHandle; use reth_network::{NetworkSyncUpdater, SyncState}; use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider}; -use reth_node_api::{BuiltPayload, FullNodeTypes, NodeAddOns, NodeTypesWithEngine}; +use reth_node_api::{ + BuiltPayload, FullNodeTypes, NodeAddOns, NodeTypesWithEngine, PayloadAttributesBuilder, + PayloadTypes, +}; use reth_node_core::{ dirs::{ChainPath, DataDirPath}, exit::NodeExitFuture, @@ -80,6 +84,9 @@ where + FullEthApiServer + AddDevSigners, >, + LocalPayloadAttributesBuilder: PayloadAttributesBuilder< + <::Engine as PayloadTypes>::PayloadAttributes, + >, { type Node = NodeHandle, AO>; @@ -210,23 +217,49 @@ where let pruner_events = pruner.events(); info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); - // Configure the consensus engine - let mut eth_service = EngineService::new( - ctx.consensus(), - ctx.components().block_executor().clone(), - ctx.chain_spec(), - network_client.clone(), - Box::pin(consensus_engine_stream), - pipeline, - Box::new(ctx.task_executor().clone()), - ctx.provider_factory().clone(), - ctx.blockchain_db().clone(), - pruner, - ctx.components().payload_builder().clone(), - engine_tree_config, - ctx.invalid_block_hook()?, - ctx.sync_metrics_tx(), - ); + let mut engine_service = if ctx.is_dev() { + let mining_mode = if let Some(block_time) = ctx.node_config().dev.block_time { + MiningMode::interval(block_time) + } else { + MiningMode::instant(ctx.components().pool().clone()) + }; + let eth_service = LocalEngineService::new( + ctx.consensus(), + ctx.components().block_executor().clone(), + ctx.provider_factory().clone(), + ctx.blockchain_db().clone(), + pruner, + ctx.components().payload_builder().clone(), + engine_tree_config, + ctx.invalid_block_hook()?, + ctx.sync_metrics_tx(), + consensus_engine_tx.clone(), + Box::pin(consensus_engine_stream), + mining_mode, + LocalPayloadAttributesBuilder::new(ctx.chain_spec()), + ); + + Either::Left(eth_service) + } else { + let eth_service = EngineService::new( + ctx.consensus(), + ctx.components().block_executor().clone(), + ctx.chain_spec(), + network_client.clone(), + Box::pin(consensus_engine_stream), + pipeline, + Box::new(ctx.task_executor().clone()), + ctx.provider_factory().clone(), + ctx.blockchain_db().clone(), + pruner, + ctx.components().payload_builder().clone(), + engine_tree_config, + ctx.invalid_block_hook()?, + ctx.sync_metrics_tx(), + ); + + Either::Right(eth_service) + }; let event_sender = EventSender::default(); @@ -340,7 +373,9 @@ where ctx.task_executor().spawn_critical("consensus engine", async move { if let Some(initial_target) = initial_target { debug!(target: "reth::cli", %initial_target, "start backfill sync"); - eth_service.orchestrator_mut().start_backfill_sync(initial_target); + if let Either::Right(eth_service) = &mut engine_service { + eth_service.orchestrator_mut().start_backfill_sync(initial_target); + } } let mut res = Ok(()); @@ -351,10 +386,12 @@ where payload = built_payloads.select_next_some() => { if let Some(executed_block) = payload.executed_block() { debug!(target: "reth::cli", block=?executed_block.block().num_hash(), "inserting built payload"); - eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into()); + if let Either::Right(eth_service) = &mut engine_service { + eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into()); + } } } - event = eth_service.next() => { + event = engine_service.next() => { let Some(event) = event else { break }; debug!(target: "reth::cli", "Event: {event}"); match event { diff --git a/crates/node/events/src/node.rs b/crates/node/events/src/node.rs index c856c0ec9..e10caaee7 100644 --- a/crates/node/events/src/node.rs +++ b/crates/node/events/src/node.rs @@ -504,7 +504,7 @@ where } else if let Some(latest_block) = this.state.latest_block { let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); - if now - this.state.latest_block_time.unwrap_or(0) > 60 { + if now.saturating_sub(this.state.latest_block_time.unwrap_or(0)) > 60 { // Once we start receiving consensus nodes, don't emit status unless stalled for // 1 minute info!( diff --git a/crates/optimism/node/Cargo.toml b/crates/optimism/node/Cargo.toml index 3c298bea9..4675029b6 100644 --- a/crates/optimism/node/Cargo.toml +++ b/crates/optimism/node/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] # reth reth-chainspec.workspace = true +reth-engine-local.workspace = true reth-primitives.workspace = true reth-payload-builder.workspace = true reth-auto-seal-consensus.workspace = true @@ -87,6 +88,7 @@ optimism = [ "reth-revm/optimism", "reth-auto-seal-consensus/optimism", "reth-optimism-rpc/optimism", + "reth-engine-local/optimism", ] asm-keccak = ["reth-primitives/asm-keccak"] test-utils = ["reth-node-builder/test-utils"] diff --git a/crates/payload/primitives/src/traits.rs b/crates/payload/primitives/src/traits.rs index 6ae6361fd..494ed68aa 100644 --- a/crates/payload/primitives/src/traits.rs +++ b/crates/payload/primitives/src/traits.rs @@ -160,12 +160,7 @@ impl PayloadAttributes for OpPayloadAttributes { } /// A builder that can return the current payload attribute. -pub trait PayloadAttributesBuilder: std::fmt::Debug + Send + Sync + 'static { - /// The payload attributes type returned by the builder. - type PayloadAttributes: PayloadAttributes; - /// The error type returned by [`PayloadAttributesBuilder::build`]. - type Error: core::error::Error + Send + Sync; - +pub trait PayloadAttributesBuilder: Send + Sync + 'static { /// Return a new payload attribute from the builder. - fn build(&self) -> Result; + fn build(&self, timestamp: u64) -> Attributes; }