diff --git a/crates/consensus/beacon/src/engine/invalid_headers.rs b/crates/consensus/beacon/src/engine/invalid_headers.rs index ebce1faf9..fbe6bf462 100644 --- a/crates/consensus/beacon/src/engine/invalid_headers.rs +++ b/crates/consensus/beacon/src/engine/invalid_headers.rs @@ -23,7 +23,8 @@ pub struct InvalidHeaderCache { } impl InvalidHeaderCache { - pub(crate) fn new(max_length: u32) -> Self { + /// Invalid header cache constructor. + pub fn new(max_length: u32) -> Self { Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() } } diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index 25d4fabf7..9b965e892 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -3,6 +3,7 @@ use crate::{ chain::{ChainHandler, FromOrchestrator, HandlerEvent}, download::{BlockDownloader, DownloadAction, DownloadOutcome}, + tree::TreeEvent, }; use futures::{Stream, StreamExt}; use reth_beacon_consensus::BeaconEngineMessage; @@ -150,7 +151,6 @@ pub struct EngineApiRequestHandler { to_tree: Sender>>, /// channel to receive messages from the tree. from_tree: UnboundedReceiver, - // TODO add db controller } impl EngineApiRequestHandler @@ -178,13 +178,16 @@ where } fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { - todo!("poll tree and handle db") + todo!("poll tree") } } /// Events emitted by the engine API handler. #[derive(Debug)] -pub enum EngineApiEvent {} +pub enum EngineApiEvent { + /// Bubbled from tree. + FromTree(TreeEvent), +} #[derive(Debug)] pub enum FromEngine { diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index a4ccea510..9ac472961 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1,12 +1,18 @@ -use crate::{backfill::BackfillAction, engine::DownloadRequest}; -use reth_beacon_consensus::{ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated}; +use crate::{ + backfill::BackfillAction, + chain::FromOrchestrator, + engine::{DownloadRequest, EngineApiEvent, FromEngine}, +}; +use reth_beacon_consensus::{ + BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated, +}; use reth_blockchain_tree::{ error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus, }; use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk}; use reth_consensus::{Consensus, PostExecutionInput}; use reth_engine_primitives::EngineTypes; -use reth_errors::{ConsensusError, ProviderResult}; +use reth_errors::{ConsensusError, ProviderResult, RethError}; use reth_evm::execute::{BlockExecutorProvider, Executor}; use reth_payload_primitives::PayloadTypes; use reth_payload_validator::ExecutionPayloadValidator; @@ -27,8 +33,9 @@ use reth_trie::{updates::TrieUpdates, HashedPostState}; use std::{ collections::{BTreeMap, HashMap}, marker::PhantomData, - sync::Arc, + sync::{mpsc::Receiver, Arc}, }; +use tokio::sync::mpsc::UnboundedSender; use tracing::*; mod memory_overlay; @@ -72,7 +79,7 @@ impl ExecutedBlock { } /// Keeps track of the state of the tree. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct TreeState { /// All executed blocks by hash. blocks_by_hash: HashMap, @@ -129,11 +136,22 @@ pub struct EngineApiTreeState { invalid_headers: InvalidHeaderCache, } +impl EngineApiTreeState { + fn new(block_buffer_limit: u32, max_invalid_header_cache_length: u32) -> Self { + Self { + invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length), + buffer: BlockBuffer::new(block_buffer_limit), + tree_state: TreeState::default(), + forkchoice_state_tracker: ForkchoiceStateTracker::default(), + } + } +} + /// The type responsible for processing engine API requests. /// /// TODO: design: should the engine handler functions also accept the response channel or return the /// result and the caller redirects the response -pub trait EngineApiTreeHandler: Send + Sync { +pub trait EngineApiTreeHandler { /// The engine type that this handler is for. type Engine: EngineTypes; @@ -170,7 +188,7 @@ pub trait EngineApiTreeHandler: Send + Sync { &mut self, state: ForkchoiceState, attrs: Option<::PayloadAttributes>, - ) -> TreeOutcome>; + ) -> TreeOutcome>; } /// The outcome of a tree operation. @@ -220,6 +238,8 @@ pub struct EngineApiTreeHandlerImpl { consensus: Arc, payload_validator: ExecutionPayloadValidator, state: EngineApiTreeState, + incoming: Receiver>>, + outgoing: UnboundedSender, /// (tmp) The flag indicating whether the pipeline is active. is_pipeline_active: bool, _marker: PhantomData, @@ -227,10 +247,101 @@ pub struct EngineApiTreeHandlerImpl { impl EngineApiTreeHandlerImpl where - P: BlockReader + StateProviderFactory, + P: BlockReader + StateProviderFactory + Clone + 'static, E: BlockExecutorProvider, - T: EngineTypes, + T: EngineTypes + 'static, { + #[allow(clippy::too_many_arguments)] + fn new( + provider: P, + executor_provider: E, + consensus: Arc, + payload_validator: ExecutionPayloadValidator, + incoming: Receiver>>, + outgoing: UnboundedSender, + state: EngineApiTreeState, + ) -> Self { + Self { + provider, + executor_provider, + consensus, + payload_validator, + incoming, + outgoing, + is_pipeline_active: false, + state, + _marker: PhantomData, + } + } + + #[allow(clippy::too_many_arguments)] + fn spawn_new( + provider: P, + executor_provider: E, + consensus: Arc, + payload_validator: ExecutionPayloadValidator, + incoming: Receiver>>, + state: EngineApiTreeState, + ) -> UnboundedSender { + let (outgoing, rx) = tokio::sync::mpsc::unbounded_channel(); + let task = Self::new( + provider, + executor_provider, + consensus, + payload_validator, + incoming, + outgoing.clone(), + state, + ); + std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap(); + outgoing + } + + fn run(mut self) { + loop { + while let Ok(msg) = self.incoming.recv() { + match msg { + FromEngine::Event(event) => match event { + FromOrchestrator::BackfillSyncFinished => { + todo!() + } + FromOrchestrator::BackfillSyncStarted => { + todo!() + } + }, + FromEngine::Request(request) => match request { + BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { + let output = self.on_forkchoice_updated(state, payload_attrs); + if let Err(err) = tx.send(output.outcome) { + error!("Failed to send event: {err:?}"); + } + } + BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { + let output = self.on_new_payload(payload, cancun_fields); + if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| { + reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new( + e, + )) + })) { + error!("Failed to send event: {err:?}"); + } + } + BeaconEngineMessage::TransitionConfigurationExchanged => { + todo!() + } + }, + FromEngine::DownloadedBlocks(blocks) => { + if let Some(event) = self.on_downloaded(blocks) { + if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) { + error!("Failed to send event: {err:?}"); + } + } + } + } + } + } + } + /// Return block from database or in-memory state by hash. fn block_by_hash(&self, hash: B256) -> ProviderResult> { // check database first @@ -463,9 +574,9 @@ where impl EngineApiTreeHandler for EngineApiTreeHandlerImpl where - P: BlockReader + StateProviderFactory + Clone, + P: BlockReader + StateProviderFactory + Clone + 'static, E: BlockExecutorProvider, - T: EngineTypes, + T: EngineTypes + 'static, { type Engine = T; @@ -588,7 +699,7 @@ where &mut self, state: ForkchoiceState, attrs: Option<::PayloadAttributes>, - ) -> TreeOutcome> { + ) -> TreeOutcome> { todo!() } }