diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index a7761615c..e8b27dc57 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -504,8 +504,10 @@ where // or cannot be processed at the moment. self.on_forkchoice_updated_status(state, on_updated, tx); } else { - self.blockchain_tree_action = - Some(BlockchainTreeAction::FcuMakeCanonical { state, attrs, tx }); + let previous_action = self + .blockchain_tree_action + .replace(BlockchainTreeAction::FcuMakeCanonical { state, attrs, tx }); + debug_assert!(previous_action.is_none(), "Pre-existing action found"); } } @@ -1030,13 +1032,17 @@ where &mut self, payload: ExecutionPayload, cancun_fields: Option, - ) -> Result { + tx: oneshot::Sender>, + ) { + self.metrics.new_payload_messages.increment(1); + let block = match self.ensure_well_formed_payload(payload, cancun_fields) { Ok(block) => block, - Err(status) => return Ok(status), + Err(status) => { + let _ = tx.send(Ok(status)); + return + } }; - let block_hash = block.hash(); - let block_num_hash = block.num_hash(); let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block.hash()); if lowest_buffered_ancestor == block.hash() { @@ -1047,74 +1053,14 @@ where if let Some(status) = self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.hash()) { - return Ok(status) + let _ = tx.send(Ok(status)); + return } - let res = if self.sync.is_pipeline_idle() { - // we can only insert new payloads if the pipeline is _not_ running, because it holds - // exclusive access to the database - self.try_insert_new_payload(block) - } else { - self.try_buffer_payload(block) - }; - - let status = match res { - Ok(status) => { - if status.is_valid() { - if let Some(target) = self.forkchoice_state_tracker.sync_target_state() { - // if we're currently syncing and the inserted block is the targeted FCU - // head block, we can try to make it canonical. - if block_hash == target.head_block_hash { - if let Err((_hash, error)) = - self.try_make_sync_target_canonical(block_num_hash) - { - return if error.is_fatal() { - error!(target: "consensus::engine", %error, "Encountered fatal error"); - Err(BeaconOnNewPayloadError::Internal(Box::new(error))) - } else { - // If we could not make the sync target block canonical, we - // should return the error as an invalid payload status. - Ok(PayloadStatus::new( - PayloadStatusEnum::Invalid { - validation_error: error.to_string(), - }, - // TODO: return a proper latest valid hash - // - // See: - self.forkchoice_state_tracker.last_valid_head(), - )) - } - } - } - } - // block was successfully inserted, so we can cancel the full block request, if - // any exists - self.sync.cancel_full_block_request(block_hash); - } - Ok(status) - } - Err(error) => { - warn!(target: "consensus::engine", %error, "Error while processing payload"); - - // If the error was due to an invalid payload, the payload is added to the invalid - // headers cache and `Ok` with [PayloadStatusEnum::Invalid] is returned. - let (block, error) = error.split(); - if error.is_invalid_block() { - warn!(target: "consensus::engine", invalid_hash=?block.hash(), invalid_number=?block.number, %error, "Invalid block error on new payload"); - let latest_valid_hash = - self.latest_valid_hash_for_invalid_payload(block.parent_hash, Some(&error)); - // keep track of the invalid header - self.invalid_headers.insert(block.header); - let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() }; - Ok(PayloadStatus::new(status, latest_valid_hash)) - } else { - Err(BeaconOnNewPayloadError::Internal(Box::new(error))) - } - } - }; - - trace!(target: "consensus::engine", ?status, "Returning payload status"); - status + let previous_action = self + .blockchain_tree_action + .replace(BlockchainTreeAction::InsertNewPayload { block, tx }); + debug_assert!(previous_action.is_none(), "Pre-existing action found"); } /// Ensures that the given payload does not violate any consensus rules that concern the block's @@ -1670,7 +1616,9 @@ where Ok(()) } - /// Process the outcome of blockchain tree action. + /// Process the next set blockchain tree action. + /// The handler might set next blockchain tree action to perform, + /// so the state change should be handled accordingly. fn on_blockchain_tree_action( &mut self, action: BlockchainTreeAction, @@ -1705,6 +1653,84 @@ where } }; } + BlockchainTreeAction::InsertNewPayload { block, tx } => { + let block_hash = block.hash(); + let block_num_hash = block.num_hash(); + let result = if self.sync.is_pipeline_idle() { + // we can only insert new payloads if the pipeline is _not_ running, because it + // holds exclusive access to the database + self.try_insert_new_payload(block) + } else { + self.try_buffer_payload(block) + }; + + let status = match result { + Ok(status) => status, + Err(error) => { + warn!(target: "consensus::engine", %error, "Error while processing payload"); + + let (block, error) = error.split(); + if !error.is_invalid_block() { + // TODO: revise if any error should be considered fatal at this point. + let _ = + tx.send(Err(BeaconOnNewPayloadError::Internal(Box::new(error)))); + return Ok(EngineEventOutcome::Processed) + } + + // If the error was due to an invalid payload, the payload is added to the + // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is + // returned. + warn!(target: "consensus::engine", invalid_hash=?block.hash(), invalid_number=?block.number, %error, "Invalid block error on new payload"); + let latest_valid_hash = self + .latest_valid_hash_for_invalid_payload(block.parent_hash, Some(&error)); + // keep track of the invalid header + self.invalid_headers.insert(block.header); + PayloadStatus::new( + PayloadStatusEnum::Invalid { validation_error: error.to_string() }, + latest_valid_hash, + ) + } + }; + + if status.is_valid() { + if let Some(target) = self.forkchoice_state_tracker.sync_target_state() { + // if we're currently syncing and the inserted block is the targeted + // FCU head block, we can try to make it canonical. + if block_hash == target.head_block_hash { + if let Err((_hash, error)) = + self.try_make_sync_target_canonical(block_num_hash) + { + if error.is_fatal() { + let response = Err(BeaconOnNewPayloadError::Internal( + Box::new(error.clone()), + )); + let _ = tx.send(response); + return Err(RethError::Canonical(error)) + } + + // If we could not make the sync target block canonical, + // we should return the error as an invalid payload status. + let status = Ok(PayloadStatus::new( + PayloadStatusEnum::Invalid { + validation_error: error.to_string(), + }, + // TODO: return a proper latest valid hash + // See: + self.forkchoice_state_tracker.last_valid_head(), + )); + let _ = tx.send(status); + return Ok(EngineEventOutcome::Processed) + } + } + } + // block was successfully inserted, so we can cancel the full block + // request, if any exists + self.sync.cancel_full_block_request(block_hash); + } + + trace!(target: "consensus::engine", ?status, "Returning payload status"); + let _ = tx.send(Ok(status)); + } }; Ok(EngineEventOutcome::Processed) } @@ -1753,10 +1779,17 @@ where // Process any blockchain tree action result as set forth during engine message // processing. if let Some(action) = this.blockchain_tree_action.take() { - match this.on_blockchain_tree_action(action)? { - EngineEventOutcome::Processed => {} - EngineEventOutcome::ReachedMaxBlock => return Poll::Ready(Ok(())), + match this.on_blockchain_tree_action(action) { + Ok(EngineEventOutcome::Processed) => {} + Ok(EngineEventOutcome::ReachedMaxBlock) => return Poll::Ready(Ok(())), + Err(error) => { + error!(target: "consensus::engine", %error, "Encountered fatal error"); + return Poll::Ready(Err(error.into())) + } }; + + // Blockchain tree action handler might set next action to take. + continue } // Process one incoming message from the CL. We don't drain the messages right away, @@ -1770,9 +1803,7 @@ where this.on_forkchoice_updated(state, payload_attrs, tx); } BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { - this.metrics.new_payload_messages.increment(1); - let res = this.on_new_payload(payload, cancun_fields); - let _ = tx.send(res); + this.on_new_payload(payload, cancun_fields, tx); } BeaconEngineMessage::TransitionConfigurationExchanged => { this.blockchain.on_transition_configuration_exchanged(); @@ -1833,6 +1864,10 @@ enum BlockchainTreeAction { attrs: Option, tx: oneshot::Sender>, }, + InsertNewPayload { + block: SealedBlock, + tx: oneshot::Sender>, + }, } /// Represents outcomes of processing an engine event