chore(engine): new payload blockchain tree action (#8041)

This commit is contained in:
Roman Krasiuk
2024-05-02 17:22:23 +02:00
committed by GitHub
parent e68ab2f58c
commit 2af2f0ba46

View File

@ -504,8 +504,10 @@ where
// or cannot be processed at the moment.
self.on_forkchoice_updated_status(state, on_updated, tx);
} else {
self.blockchain_tree_action =
Some(BlockchainTreeAction::FcuMakeCanonical { state, attrs, tx });
let previous_action = self
.blockchain_tree_action
.replace(BlockchainTreeAction::FcuMakeCanonical { state, attrs, tx });
debug_assert!(previous_action.is_none(), "Pre-existing action found");
}
}
@ -1030,13 +1032,17 @@ where
&mut self,
payload: ExecutionPayload,
cancun_fields: Option<CancunPayloadFields>,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
) {
self.metrics.new_payload_messages.increment(1);
let block = match self.ensure_well_formed_payload(payload, cancun_fields) {
Ok(block) => block,
Err(status) => return Ok(status),
Err(status) => {
let _ = tx.send(Ok(status));
return
}
};
let block_hash = block.hash();
let block_num_hash = block.num_hash();
let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block.hash());
if lowest_buffered_ancestor == block.hash() {
@ -1047,74 +1053,14 @@ where
if let Some(status) =
self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.hash())
{
return Ok(status)
let _ = tx.send(Ok(status));
return
}
let res = if self.sync.is_pipeline_idle() {
// we can only insert new payloads if the pipeline is _not_ running, because it holds
// exclusive access to the database
self.try_insert_new_payload(block)
} else {
self.try_buffer_payload(block)
};
let status = match res {
Ok(status) => {
if status.is_valid() {
if let Some(target) = self.forkchoice_state_tracker.sync_target_state() {
// if we're currently syncing and the inserted block is the targeted FCU
// head block, we can try to make it canonical.
if block_hash == target.head_block_hash {
if let Err((_hash, error)) =
self.try_make_sync_target_canonical(block_num_hash)
{
return if error.is_fatal() {
error!(target: "consensus::engine", %error, "Encountered fatal error");
Err(BeaconOnNewPayloadError::Internal(Box::new(error)))
} else {
// If we could not make the sync target block canonical, we
// should return the error as an invalid payload status.
Ok(PayloadStatus::new(
PayloadStatusEnum::Invalid {
validation_error: error.to_string(),
},
// TODO: return a proper latest valid hash
//
// See: <https://github.com/paradigmxyz/reth/issues/7146>
self.forkchoice_state_tracker.last_valid_head(),
))
}
}
}
}
// block was successfully inserted, so we can cancel the full block request, if
// any exists
self.sync.cancel_full_block_request(block_hash);
}
Ok(status)
}
Err(error) => {
warn!(target: "consensus::engine", %error, "Error while processing payload");
// If the error was due to an invalid payload, the payload is added to the invalid
// headers cache and `Ok` with [PayloadStatusEnum::Invalid] is returned.
let (block, error) = error.split();
if error.is_invalid_block() {
warn!(target: "consensus::engine", invalid_hash=?block.hash(), invalid_number=?block.number, %error, "Invalid block error on new payload");
let latest_valid_hash =
self.latest_valid_hash_for_invalid_payload(block.parent_hash, Some(&error));
// keep track of the invalid header
self.invalid_headers.insert(block.header);
let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() };
Ok(PayloadStatus::new(status, latest_valid_hash))
} else {
Err(BeaconOnNewPayloadError::Internal(Box::new(error)))
}
}
};
trace!(target: "consensus::engine", ?status, "Returning payload status");
status
let previous_action = self
.blockchain_tree_action
.replace(BlockchainTreeAction::InsertNewPayload { block, tx });
debug_assert!(previous_action.is_none(), "Pre-existing action found");
}
/// Ensures that the given payload does not violate any consensus rules that concern the block's
@ -1670,7 +1616,9 @@ where
Ok(())
}
/// Process the outcome of blockchain tree action.
/// Process the next set blockchain tree action.
/// The handler might set next blockchain tree action to perform,
/// so the state change should be handled accordingly.
fn on_blockchain_tree_action(
&mut self,
action: BlockchainTreeAction<EngineT>,
@ -1705,6 +1653,84 @@ where
}
};
}
BlockchainTreeAction::InsertNewPayload { block, tx } => {
let block_hash = block.hash();
let block_num_hash = block.num_hash();
let result = if self.sync.is_pipeline_idle() {
// we can only insert new payloads if the pipeline is _not_ running, because it
// holds exclusive access to the database
self.try_insert_new_payload(block)
} else {
self.try_buffer_payload(block)
};
let status = match result {
Ok(status) => status,
Err(error) => {
warn!(target: "consensus::engine", %error, "Error while processing payload");
let (block, error) = error.split();
if !error.is_invalid_block() {
// TODO: revise if any error should be considered fatal at this point.
let _ =
tx.send(Err(BeaconOnNewPayloadError::Internal(Box::new(error))));
return Ok(EngineEventOutcome::Processed)
}
// If the error was due to an invalid payload, the payload is added to the
// invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
// returned.
warn!(target: "consensus::engine", invalid_hash=?block.hash(), invalid_number=?block.number, %error, "Invalid block error on new payload");
let latest_valid_hash = self
.latest_valid_hash_for_invalid_payload(block.parent_hash, Some(&error));
// keep track of the invalid header
self.invalid_headers.insert(block.header);
PayloadStatus::new(
PayloadStatusEnum::Invalid { validation_error: error.to_string() },
latest_valid_hash,
)
}
};
if status.is_valid() {
if let Some(target) = self.forkchoice_state_tracker.sync_target_state() {
// if we're currently syncing and the inserted block is the targeted
// FCU head block, we can try to make it canonical.
if block_hash == target.head_block_hash {
if let Err((_hash, error)) =
self.try_make_sync_target_canonical(block_num_hash)
{
if error.is_fatal() {
let response = Err(BeaconOnNewPayloadError::Internal(
Box::new(error.clone()),
));
let _ = tx.send(response);
return Err(RethError::Canonical(error))
}
// If we could not make the sync target block canonical,
// we should return the error as an invalid payload status.
let status = Ok(PayloadStatus::new(
PayloadStatusEnum::Invalid {
validation_error: error.to_string(),
},
// TODO: return a proper latest valid hash
// See: <https://github.com/paradigmxyz/reth/issues/7146>
self.forkchoice_state_tracker.last_valid_head(),
));
let _ = tx.send(status);
return Ok(EngineEventOutcome::Processed)
}
}
}
// block was successfully inserted, so we can cancel the full block
// request, if any exists
self.sync.cancel_full_block_request(block_hash);
}
trace!(target: "consensus::engine", ?status, "Returning payload status");
let _ = tx.send(Ok(status));
}
};
Ok(EngineEventOutcome::Processed)
}
@ -1753,10 +1779,17 @@ where
// Process any blockchain tree action result as set forth during engine message
// processing.
if let Some(action) = this.blockchain_tree_action.take() {
match this.on_blockchain_tree_action(action)? {
EngineEventOutcome::Processed => {}
EngineEventOutcome::ReachedMaxBlock => return Poll::Ready(Ok(())),
match this.on_blockchain_tree_action(action) {
Ok(EngineEventOutcome::Processed) => {}
Ok(EngineEventOutcome::ReachedMaxBlock) => return Poll::Ready(Ok(())),
Err(error) => {
error!(target: "consensus::engine", %error, "Encountered fatal error");
return Poll::Ready(Err(error.into()))
}
};
// Blockchain tree action handler might set next action to take.
continue
}
// Process one incoming message from the CL. We don't drain the messages right away,
@ -1770,9 +1803,7 @@ where
this.on_forkchoice_updated(state, payload_attrs, tx);
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
this.metrics.new_payload_messages.increment(1);
let res = this.on_new_payload(payload, cancun_fields);
let _ = tx.send(res);
this.on_new_payload(payload, cancun_fields, tx);
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
@ -1833,6 +1864,10 @@ enum BlockchainTreeAction<EngineT: EngineTypes> {
attrs: Option<EngineT::PayloadAttributes>,
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
},
InsertNewPayload {
block: SealedBlock,
tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
},
}
/// Represents outcomes of processing an engine event