feat(engine): process pending fcu after release of db write hook (#8315)

This commit is contained in:
Roman Krasiuk
2024-05-31 16:13:11 +02:00
committed by GitHub
parent 411c961362
commit 256f6a954d

View File

@ -186,6 +186,16 @@ where
payload_validator: ExecutionPayloadValidator,
/// Current blockchain tree action.
blockchain_tree_action: Option<BlockchainTreeAction<EngineT>>,
/// Pending forkchoice update.
/// It is recorded if we cannot process the forkchoice update because
/// a hook with database read-write access is active.
/// This is a temporary solution to always process missed FCUs.
#[allow(clippy::type_complexity)]
pending_forkchoice_update: Option<(
ForkchoiceState,
Option<EngineT::PayloadAttributes>,
oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
)>,
/// Tracks the header of invalid payloads that were rejected by the engine because they're
/// invalid.
invalid_headers: InvalidHeaderCache,
@ -304,6 +314,7 @@ where
payload_builder,
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
blockchain_tree_action: None,
pending_forkchoice_update: None,
pipeline_run_threshold,
hooks: EngineHooksController::new(hooks),
event_sender,
@ -366,21 +377,6 @@ where
return Ok(Some(OnForkChoiceUpdated::syncing()))
}
if let Some(hook) = self.hooks.active_db_write_hook() {
// We can only process new forkchoice updates if no hook with db write is running,
// since it requires exclusive access to the database
warn!(
target: "consensus::engine",
hook = %hook.name(),
head_block_hash = ?state.head_block_hash,
safe_block_hash = ?state.safe_block_hash,
finalized_block_hash = ?state.finalized_block_hash,
"Hook is in progress, skipping forkchoice update. \
This may affect the performance of your node as a validator."
);
return Ok(Some(OnForkChoiceUpdated::syncing()))
}
Ok(None)
}
@ -508,20 +504,37 @@ where
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
match self.pre_validate_forkchoice_update(state) {
Ok(Some(on_updated)) => {
// Pre-validate forkchoice state update and return if it's invalid
// or cannot be processed at the moment.
self.on_forkchoice_updated_status(state, on_updated, tx);
}
Ok(None) => {
self.set_blockchain_tree_action(
BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx },
);
Ok(on_updated_result) => {
if let Some(on_updated) = on_updated_result {
// Pre-validate forkchoice state update and return if it's invalid
// or cannot be processed at the moment.
self.on_forkchoice_updated_status(state, on_updated, tx);
} else if let Some(hook) = self.hooks.active_db_write_hook() {
// We can only process new forkchoice updates if no hook with db write is
// running, since it requires exclusive access to the
// database
let replaced_pending =
self.pending_forkchoice_update.replace((state, attrs, tx));
warn!(
target: "consensus::engine",
hook = %hook.name(),
head_block_hash = ?state.head_block_hash,
safe_block_hash = ?state.safe_block_hash,
finalized_block_hash = ?state.finalized_block_hash,
replaced_pending = ?replaced_pending.map(|(state, _, _)| state),
"Hook is in progress, delaying forkchoice update. \
This may affect the performance of your node as a validator."
);
} else {
self.set_blockchain_tree_action(
BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx },
);
}
}
Err(error) => {
let _ = tx.send(Err(error.into()));
}
};
}
}
/// Called after the forkchoice update status has been resolved.
@ -1827,6 +1840,17 @@ where
continue
}
// If the db write hook is no longer active and we have a pending forkchoice update,
// process it first.
if this.hooks.active_db_write_hook().is_none() {
if let Some((state, attrs, tx)) = this.pending_forkchoice_update.take() {
this.set_blockchain_tree_action(
BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx },
);
continue
}
}
// Process one incoming message from the CL. We don't drain the messages right away,
// because we want to sneak a polling of running hook in between them.
//