feat: inject locally built payloads into the tree (#10216)

This commit is contained in:
Matthias Seitz
2024-08-08 23:53:43 +02:00
committed by GitHub
parent 63c71cf385
commit aa278bcdef
2 changed files with 58 additions and 30 deletions

View File

@ -234,6 +234,12 @@ impl<T: EngineTypes> From<BeaconEngineMessage<T>> for EngineApiRequest<T> {
} }
} }
impl<T: EngineTypes> From<EngineApiRequest<T>> for FromEngine<EngineApiRequest<T>> {
fn from(req: EngineApiRequest<T>) -> Self {
Self::Request(req)
}
}
/// Events emitted by the engine API handler. /// Events emitted by the engine API handler.
#[derive(Debug)] #[derive(Debug)]
pub enum EngineApiEvent { pub enum EngineApiEvent {

View File

@ -16,11 +16,14 @@ use reth_beacon_consensus::{
}; };
use reth_blockchain_tree::BlockchainTreeConfig; use reth_blockchain_tree::BlockchainTreeConfig;
use reth_engine_service::service::{ChainEvent, EngineService}; use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::tree::TreeConfig; use reth_engine_tree::{
engine::{EngineApiRequest, EngineRequestHandler},
tree::TreeConfig,
};
use reth_exex::ExExManagerHandle; use reth_exex::ExExManagerHandle;
use reth_network::{NetworkSyncUpdater, SyncState}; use reth_network::{NetworkSyncUpdater, SyncState};
use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider}; use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider};
use reth_node_api::{FullNodeTypes, NodeAddOns}; use reth_node_api::{BuiltPayload, FullNodeTypes, NodeAddOns};
use reth_node_core::{ use reth_node_core::{
dirs::{ChainPath, DataDirPath}, dirs::{ChainPath, DataDirPath},
exit::NodeExitFuture, exit::NodeExitFuture,
@ -251,6 +254,14 @@ where
// Run consensus engine to completion // Run consensus engine to completion
let initial_target = ctx.initial_backfill_target()?; let initial_target = ctx.initial_backfill_target()?;
let network_handle = ctx.components().network().clone(); let network_handle = ctx.components().network().clone();
let mut built_payloads = ctx
.components()
.payload_builder()
.subscribe()
.await
.map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
.into_built_payload_stream()
.fuse();
let chainspec = ctx.chain_spec(); let chainspec = ctx.chain_spec();
let (exit, rx) = oneshot::channel(); let (exit, rx) = oneshot::channel();
info!(target: "reth::cli", "Starting consensus engine"); info!(target: "reth::cli", "Starting consensus engine");
@ -262,35 +273,46 @@ where
let mut res = Ok(()); let mut res = Ok(());
// advance the chain and handle events // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
while let Some(event) = eth_service.next().await { loop {
debug!(target: "reth::cli", "Event: {event:?}"); tokio::select! {
match event { payload = built_payloads.select_next_some() => {
ChainEvent::BackfillSyncFinished => { if let Some(executed_block) = payload.executed_block() {
network_handle.update_sync_state(SyncState::Idle); debug!(target: "reth::cli", hash=%executed_block.block().hash(), "inserting built payload");
} eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
ChainEvent::BackfillSyncStarted => { }
network_handle.update_sync_state(SyncState::Syncing); }
} event = eth_service.next() => {
ChainEvent::FatalError => { let Some(event) = event else { break };
error!(target: "reth::cli", "Fatal error in consensus engine"); debug!(target: "reth::cli", "Event: {event:?}");
res = Err(eyre::eyre!("Fatal error in consensus engine")); match event {
break ChainEvent::BackfillSyncFinished => {
} network_handle.update_sync_state(SyncState::Idle);
ChainEvent::Handler(ev) => { }
if let Some(head) = ev.canonical_header() { ChainEvent::BackfillSyncStarted => {
let head_block = Head { network_handle.update_sync_state(SyncState::Syncing);
number: head.number, }
hash: head.hash(), ChainEvent::FatalError => {
difficulty: head.difficulty, error!(target: "reth::cli", "Fatal error in consensus engine");
timestamp: head.timestamp, res = Err(eyre::eyre!("Fatal error in consensus engine"));
total_difficulty: chainspec break
.final_paris_total_difficulty(head.number) }
.unwrap_or_default(), ChainEvent::Handler(ev) => {
}; if let Some(head) = ev.canonical_header() {
network_handle.update_status(head_block); let head_block = Head {
number: head.number,
hash: head.hash(),
difficulty: head.difficulty,
timestamp: head.timestamp,
total_difficulty: chainspec
.final_paris_total_difficulty(head.number)
.unwrap_or_default(),
};
network_handle.update_status(head_block);
}
event_sender.notify(ev);
}
} }
event_sender.notify(ev);
} }
} }
} }