diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index d7dfba4b0..3871a233f 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -234,6 +234,12 @@ impl From> for EngineApiRequest { } } +impl From> for FromEngine> { + fn from(req: EngineApiRequest) -> Self { + Self::Request(req) + } +} + /// Events emitted by the engine API handler. #[derive(Debug)] pub enum EngineApiEvent { diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 39ad86230..83cd599b7 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -16,11 +16,14 @@ use reth_beacon_consensus::{ }; use reth_blockchain_tree::BlockchainTreeConfig; use reth_engine_service::service::{ChainEvent, EngineService}; -use reth_engine_tree::tree::TreeConfig; +use reth_engine_tree::{ + engine::{EngineApiRequest, EngineRequestHandler}, + tree::TreeConfig, +}; use reth_exex::ExExManagerHandle; use reth_network::{NetworkSyncUpdater, SyncState}; use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider}; -use reth_node_api::{FullNodeTypes, NodeAddOns}; +use reth_node_api::{BuiltPayload, FullNodeTypes, NodeAddOns}; use reth_node_core::{ dirs::{ChainPath, DataDirPath}, exit::NodeExitFuture, @@ -251,6 +254,14 @@ where // Run consensus engine to completion let initial_target = ctx.initial_backfill_target()?; let network_handle = ctx.components().network().clone(); + let mut built_payloads = ctx + .components() + .payload_builder() + .subscribe() + .await + .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))? + .into_built_payload_stream() + .fuse(); let chainspec = ctx.chain_spec(); let (exit, rx) = oneshot::channel(); info!(target: "reth::cli", "Starting consensus engine"); @@ -262,35 +273,46 @@ where let mut res = Ok(()); - // advance the chain and handle events - while let Some(event) = eth_service.next().await { - debug!(target: "reth::cli", "Event: {event:?}"); - match event { - ChainEvent::BackfillSyncFinished => { - network_handle.update_sync_state(SyncState::Idle); - } - ChainEvent::BackfillSyncStarted => { - network_handle.update_sync_state(SyncState::Syncing); - } - ChainEvent::FatalError => { - error!(target: "reth::cli", "Fatal error in consensus engine"); - res = Err(eyre::eyre!("Fatal error in consensus engine")); - break - } - ChainEvent::Handler(ev) => { - if let Some(head) = ev.canonical_header() { - let head_block = Head { - number: head.number, - hash: head.hash(), - difficulty: head.difficulty, - timestamp: head.timestamp, - total_difficulty: chainspec - .final_paris_total_difficulty(head.number) - .unwrap_or_default(), - }; - network_handle.update_status(head_block); + // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL + loop { + tokio::select! { + payload = built_payloads.select_next_some() => { + if let Some(executed_block) = payload.executed_block() { + debug!(target: "reth::cli", hash=%executed_block.block().hash(), "inserting built payload"); + eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into()); + } + } + event = eth_service.next() => { + let Some(event) = event else { break }; + debug!(target: "reth::cli", "Event: {event:?}"); + match event { + ChainEvent::BackfillSyncFinished => { + network_handle.update_sync_state(SyncState::Idle); + } + ChainEvent::BackfillSyncStarted => { + network_handle.update_sync_state(SyncState::Syncing); + } + ChainEvent::FatalError => { + error!(target: "reth::cli", "Fatal error in consensus engine"); + res = Err(eyre::eyre!("Fatal error in consensus engine")); + break + } + ChainEvent::Handler(ev) => { + if let Some(head) = ev.canonical_header() { + let head_block = Head { + number: head.number, + hash: head.hash(), + difficulty: head.difficulty, + timestamp: head.timestamp, + total_difficulty: chainspec + .final_paris_total_difficulty(head.number) + .unwrap_or_default(), + }; + network_handle.update_status(head_block); + } + event_sender.notify(ev); + } } - event_sender.notify(ev); } } }