chore(engine): introduce blockchain tree action (#8029)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Roman Krasiuk
2024-05-02 09:30:44 +02:00
committed by GitHub
parent 2334317dc7
commit 4f002f6ef1

View File

@ -43,8 +43,8 @@ use std::{
time::{Duration, Instant},
};
use tokio::sync::{
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender},
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
@ -189,13 +189,11 @@ where
payload_builder: PayloadBuilderHandle<EngineT>,
/// Validator for execution payloads
payload_validator: ExecutionPayloadValidator,
/// Listeners for engine events.
listeners: EventListeners<BeaconConsensusEngineEvent>,
/// Current blockchain tree action.
blockchain_tree_action: Option<BlockchainTreeAction<EngineT>>,
/// Tracks the header of invalid payloads that were rejected by the engine because they're
/// invalid.
invalid_headers: InvalidHeaderCache,
/// Consensus engine metrics.
metrics: EngineMetrics,
/// After downloading a block corresponding to a recent forkchoice update, the engine will
/// check whether or not we can connect the block to the current canonical chain. If we can't,
/// we need to download and execute the missing parents of that block.
@ -209,6 +207,10 @@ where
/// be used to download and execute the missing blocks.
pipeline_run_threshold: u64,
hooks: EngineHooksController,
/// Listeners for engine events.
listeners: EventListeners<BeaconConsensusEngineEvent>,
/// Consensus engine metrics.
metrics: EngineMetrics,
}
impl<DB, BT, Client, EngineT> BeaconConsensusEngine<DB, BT, Client, EngineT>
@ -305,11 +307,12 @@ where
handle: handle.clone(),
forkchoice_state_tracker: Default::default(),
payload_builder,
listeners,
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
metrics: EngineMetrics::default(),
blockchain_tree_action: None,
pipeline_run_threshold,
hooks: EngineHooksController::new(hooks),
listeners,
metrics: EngineMetrics::default(),
};
let maybe_pipeline_target = match target {
@ -496,35 +499,38 @@ where
&mut self,
state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>,
) -> Result<OnForkChoiceUpdated, CanonicalError> {
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
) {
self.metrics.forkchoice_updated_messages.increment(1);
self.blockchain.on_forkchoice_update_received(&state);
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
// Pre-validate forkchoice state update and return if it's invalid or
// cannot be processed at the moment.
if let Some(on_updated) = self.pre_validate_forkchoice_update(state) {
return Ok(on_updated)
// Pre-validate forkchoice state update and return if it's invalid
// or cannot be processed at the moment.
self.on_forkchoice_updated_status(state, on_updated, tx);
} else {
self.blockchain_tree_action =
Some(BlockchainTreeAction::FcuMakeCanonical { state, attrs, tx });
}
let start = Instant::now();
let make_canonical_result = self.blockchain.make_canonical(state.head_block_hash);
let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);
let status = self.on_forkchoice_updated_make_canonical_result(
state,
attrs,
make_canonical_result,
elapsed,
)?;
trace!(target: "consensus::engine", ?status, ?state, "Returning forkchoice status");
Ok(status)
}
/// Called after the forkchoice update status has been resolved.
/// Depending on the outcome, the method updates the sync state and notifies the listeners
/// about new processed FCU.
fn on_forkchoice_updated_status(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) {
fn on_forkchoice_updated_status(
&mut self,
state: ForkchoiceState,
on_updated: OnForkChoiceUpdated,
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
) {
// send the response to the CL ASAP
let status = on_updated.forkchoice_status();
let _ = tx.send(Ok(on_updated));
// update the forkchoice state tracker
self.forkchoice_state_tracker.set_latest(state, status);
match status {
ForkchoiceStatus::Invalid => {}
ForkchoiceStatus::Valid => {
@ -1491,17 +1497,17 @@ where
fn on_sync_event(
&mut self,
event: EngineSyncEvent,
) -> Result<SyncEventOutcome, BeaconConsensusEngineError> {
) -> Result<EngineEventOutcome, BeaconConsensusEngineError> {
let outcome = match event {
EngineSyncEvent::FetchedFullBlock(block) => {
self.on_downloaded_block(block);
SyncEventOutcome::Processed
EngineEventOutcome::Processed
}
EngineSyncEvent::PipelineStarted(target) => {
trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline");
self.metrics.pipeline_runs.increment(1);
self.sync_state_updater.update_sync_state(SyncState::Syncing);
SyncEventOutcome::Processed
EngineEventOutcome::Processed
}
EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished");
@ -1509,10 +1515,10 @@ where
let ctrl = result?;
if reached_max_block {
// Terminate the sync early if it's reached the maximum user-configured block.
SyncEventOutcome::ReachedMaxBlock
EngineEventOutcome::ReachedMaxBlock
} else {
self.on_pipeline_outcome(ctrl)?;
SyncEventOutcome::Processed
EngineEventOutcome::Processed
}
}
EngineSyncEvent::PipelineTaskDropped => {
@ -1669,6 +1675,45 @@ where
Ok(())
}
/// Process the outcome of blockchain tree action.
fn on_blockchain_tree_action(
&mut self,
action: BlockchainTreeAction<EngineT>,
) -> RethResult<EngineEventOutcome> {
match action {
BlockchainTreeAction::FcuMakeCanonical { state, attrs, tx } => {
let start = Instant::now();
let result = self.blockchain.make_canonical(state.head_block_hash);
let elapsed = self.record_make_canonical_latency(start, &result);
match self
.on_forkchoice_updated_make_canonical_result(state, attrs, result, elapsed)
{
Ok(on_updated) => {
trace!(target: "consensus::engine", status = ?on_updated, ?state, "Returning forkchoice status");
let fcu_status = on_updated.forkchoice_status();
self.on_forkchoice_updated_status(state, on_updated, tx);
if fcu_status.is_valid() {
let tip_number = self.blockchain.canonical_tip().number;
if self.sync.has_reached_max_block(tip_number) {
// Terminate the sync early if it's reached
// the maximum user configured block.
return Ok(EngineEventOutcome::ReachedMaxBlock)
}
}
}
Err(error) => {
let _ = tx.send(Err(RethError::Canonical(error.clone())));
if error.is_fatal() {
return Err(RethError::Canonical(error))
}
}
};
}
};
Ok(EngineEventOutcome::Processed)
}
}
/// On initialization, the consensus engine will poll the message receiver and return
@ -1711,6 +1756,15 @@ where
continue
}
// Process any blockchain tree action result as set forth during engine message
// processing.
if let Some(action) = this.blockchain_tree_action.take() {
match this.on_blockchain_tree_action(action)? {
EngineEventOutcome::Processed => {}
EngineEventOutcome::ReachedMaxBlock => return Poll::Ready(Ok(())),
};
}
// Process one incoming message from the CL. We don't drain the messages right away,
// because we want to sneak a polling of running hook in between them.
//
@ -1719,34 +1773,7 @@ where
if let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
match this.on_forkchoice_updated(state, payload_attrs) {
Ok(on_updated) => {
let fcu_status = on_updated.forkchoice_status();
// update the forkchoice state tracker
this.forkchoice_state_tracker.set_latest(state, fcu_status);
// send the response to the CL ASAP
let _ = tx.send(Ok(on_updated));
if fcu_status.is_valid() {
let tip_number = this.blockchain.canonical_tip().number;
if this.sync.has_reached_max_block(tip_number) {
// Terminate the sync early if it's reached the
// maximum user configured block.
return Poll::Ready(Ok(()))
}
}
this.on_forkchoice_updated_status(state, fcu_status);
}
Err(error) => {
if error.is_fatal() {
// fatal error, we can terminate the future
let _ = tx.send(Err(RethError::Canonical(error.clone())));
return Poll::Ready(Err(RethError::Canonical(error).into()))
}
let _ = tx.send(Err(RethError::Canonical(error)));
}
};
this.on_forkchoice_updated(state, payload_attrs, tx);
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
this.metrics.new_payload_messages.increment(1);
@ -1770,9 +1797,9 @@ where
if let Poll::Ready(sync_event) = this.sync.poll(cx) {
match this.on_sync_event(sync_event)? {
// Sync event was successfully processed
SyncEventOutcome::Processed => (),
EngineEventOutcome::Processed => (),
// Max block has been reached, exit the engine loop
SyncEventOutcome::ReachedMaxBlock => return Poll::Ready(Ok(())),
EngineEventOutcome::ReachedMaxBlock => return Poll::Ready(Ok(())),
}
// this could have taken a while, so we start the next cycle to handle any new
@ -1806,12 +1833,20 @@ where
}
}
/// Represents outcomes of processing a sync event
enum BlockchainTreeAction<EngineT: EngineTypes> {
FcuMakeCanonical {
state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>,
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
},
}
/// Represents outcomes of processing an engine event
#[derive(Debug)]
enum SyncEventOutcome {
/// Sync event was processed successfully, engine should continue.
enum EngineEventOutcome {
/// Engine event was processed successfully, engine should continue.
Processed,
/// Sync event was processed successfully and reached max block.
/// Engine event was processed successfully and reached max block.
ReachedMaxBlock,
}