feat: add entrypoint and main loop to EngineApiTreeHandlerImpl (#9334)

This commit is contained in:
Federico Gimenez
2024-07-08 12:09:15 +02:00
committed by GitHub
parent 61f2505d4d
commit 1b3209ae0e
3 changed files with 131 additions and 16 deletions

View File

@ -3,6 +3,7 @@
use crate::{
chain::{ChainHandler, FromOrchestrator, HandlerEvent},
download::{BlockDownloader, DownloadAction, DownloadOutcome},
tree::TreeEvent,
};
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
@ -150,7 +151,6 @@ pub struct EngineApiRequestHandler<T: EngineTypes> {
to_tree: Sender<FromEngine<BeaconEngineMessage<T>>>,
/// channel to receive messages from the tree.
from_tree: UnboundedReceiver<EngineApiEvent>,
// TODO add db controller
}
impl<T> EngineApiRequestHandler<T>
@ -178,13 +178,16 @@ where
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
todo!("poll tree and handle db")
todo!("poll tree")
}
}
/// Events emitted by the engine API handler.
#[derive(Debug)]
pub enum EngineApiEvent {}
pub enum EngineApiEvent {
/// Bubbled from tree.
FromTree(TreeEvent),
}
#[derive(Debug)]
pub enum FromEngine<Req> {

View File

@ -1,12 +1,18 @@
use crate::{backfill::BackfillAction, engine::DownloadRequest};
use reth_beacon_consensus::{ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated};
use crate::{
backfill::BackfillAction,
chain::FromOrchestrator,
engine::{DownloadRequest, EngineApiEvent, FromEngine},
};
use reth_beacon_consensus::{
BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated,
};
use reth_blockchain_tree::{
error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus,
};
use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk};
use reth_consensus::{Consensus, PostExecutionInput};
use reth_engine_primitives::EngineTypes;
use reth_errors::{ConsensusError, ProviderResult};
use reth_errors::{ConsensusError, ProviderResult, RethError};
use reth_evm::execute::{BlockExecutorProvider, Executor};
use reth_payload_primitives::PayloadTypes;
use reth_payload_validator::ExecutionPayloadValidator;
@ -27,8 +33,9 @@ use reth_trie::{updates::TrieUpdates, HashedPostState};
use std::{
collections::{BTreeMap, HashMap},
marker::PhantomData,
sync::Arc,
sync::{mpsc::Receiver, Arc},
};
use tokio::sync::mpsc::UnboundedSender;
use tracing::*;
mod memory_overlay;
@ -72,7 +79,7 @@ impl ExecutedBlock {
}
/// Keeps track of the state of the tree.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct TreeState {
/// All executed blocks by hash.
blocks_by_hash: HashMap<B256, ExecutedBlock>,
@ -129,11 +136,22 @@ pub struct EngineApiTreeState {
invalid_headers: InvalidHeaderCache,
}
impl EngineApiTreeState {
fn new(block_buffer_limit: u32, max_invalid_header_cache_length: u32) -> Self {
Self {
invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
buffer: BlockBuffer::new(block_buffer_limit),
tree_state: TreeState::default(),
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
}
}
}
/// The type responsible for processing engine API requests.
///
/// TODO: design: should the engine handler functions also accept the response channel or return the
/// result and the caller redirects the response
pub trait EngineApiTreeHandler: Send + Sync {
pub trait EngineApiTreeHandler {
/// The engine type that this handler is for.
type Engine: EngineTypes;
@ -170,7 +188,7 @@ pub trait EngineApiTreeHandler: Send + Sync {
&mut self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> TreeOutcome<Result<OnForkChoiceUpdated, String>>;
) -> TreeOutcome<Result<OnForkChoiceUpdated, RethError>>;
}
/// The outcome of a tree operation.
@ -220,6 +238,8 @@ pub struct EngineApiTreeHandlerImpl<P, E, T: EngineTypes> {
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
state: EngineApiTreeState,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
outgoing: UnboundedSender<EngineApiEvent>,
/// (tmp) The flag indicating whether the pipeline is active.
is_pipeline_active: bool,
_marker: PhantomData<T>,
@ -227,10 +247,101 @@ pub struct EngineApiTreeHandlerImpl<P, E, T: EngineTypes> {
impl<P, E, T> EngineApiTreeHandlerImpl<P, E, T>
where
P: BlockReader + StateProviderFactory,
P: BlockReader + StateProviderFactory + Clone + 'static,
E: BlockExecutorProvider,
T: EngineTypes,
T: EngineTypes + 'static,
{
#[allow(clippy::too_many_arguments)]
fn new(
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
outgoing: UnboundedSender<EngineApiEvent>,
state: EngineApiTreeState,
) -> Self {
Self {
provider,
executor_provider,
consensus,
payload_validator,
incoming,
outgoing,
is_pipeline_active: false,
state,
_marker: PhantomData,
}
}
#[allow(clippy::too_many_arguments)]
fn spawn_new(
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
state: EngineApiTreeState,
) -> UnboundedSender<EngineApiEvent> {
let (outgoing, rx) = tokio::sync::mpsc::unbounded_channel();
let task = Self::new(
provider,
executor_provider,
consensus,
payload_validator,
incoming,
outgoing.clone(),
state,
);
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
outgoing
}
fn run(mut self) {
loop {
while let Ok(msg) = self.incoming.recv() {
match msg {
FromEngine::Event(event) => match event {
FromOrchestrator::BackfillSyncFinished => {
todo!()
}
FromOrchestrator::BackfillSyncStarted => {
todo!()
}
},
FromEngine::Request(request) => match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let output = self.on_forkchoice_updated(state, payload_attrs);
if let Err(err) = tx.send(output.outcome) {
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
let output = self.on_new_payload(payload, cancun_fields);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(
e,
))
})) {
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
todo!()
}
},
FromEngine::DownloadedBlocks(blocks) => {
if let Some(event) = self.on_downloaded(blocks) {
if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) {
error!("Failed to send event: {err:?}");
}
}
}
}
}
}
}
/// Return block from database or in-memory state by hash.
fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<Block>> {
// check database first
@ -463,9 +574,9 @@ where
impl<P, E, T> EngineApiTreeHandler for EngineApiTreeHandlerImpl<P, E, T>
where
P: BlockReader + StateProviderFactory + Clone,
P: BlockReader + StateProviderFactory + Clone + 'static,
E: BlockExecutorProvider,
T: EngineTypes,
T: EngineTypes + 'static,
{
type Engine = T;
@ -588,7 +699,7 @@ where
&mut self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> TreeOutcome<Result<OnForkChoiceUpdated, String>> {
) -> TreeOutcome<Result<OnForkChoiceUpdated, RethError>> {
todo!()
}
}