feat: Properly bubble up InsertBlockFatalError (#10276)

This commit is contained in:
frostburn
2024-08-26 15:35:39 -07:00
committed by GitHub
parent 498ced8be9
commit 1c427c6c6a

View File

@ -518,7 +518,10 @@ where
loop {
match self.try_recv_engine_message() {
Ok(Some(msg)) => {
self.on_engine_message(msg);
if let Err(fatal) = self.on_engine_message(msg) {
error!(target: "engine", %fatal, "insert block fatal error");
return
}
}
Ok(None) => {
debug!(target: "engine", "received no engine message for some time, while waiting for persistence task to complete");
@ -531,7 +534,7 @@ where
if let Err(err) = self.advance_persistence() {
error!(target: "engine", %err, "Advancing persistence failed");
break
return
}
}
}
@ -540,22 +543,25 @@ where
///
/// If the block count exceeds the configured batch size we're allowed to execute at once, this
/// will execute the first batch and send the remaining blocks back through the channel so that
/// don't block request processing for a long time.
fn on_downloaded(&mut self, mut blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
/// block request processing isn't blocked for a long time.
fn on_downloaded(
&mut self,
mut blocks: Vec<SealedBlockWithSenders>,
) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
if blocks.is_empty() {
// nothing to execute
return None
return Ok(None)
}
trace!(target: "engine", block_count = %blocks.len(), "received downloaded blocks");
let batch = self.config.max_execute_block_batch_size().min(blocks.len());
for block in blocks.drain(..batch) {
if let Some(event) = self.on_downloaded_block(block) {
if let Some(event) = self.on_downloaded_block(block)? {
let needs_backfill = event.is_backfill_action();
self.on_tree_event(event);
if needs_backfill {
// can exit early if backfill is needed
return None
return Ok(None)
}
}
}
@ -565,7 +571,7 @@ where
let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
}
None
Ok(None)
}
/// When the Consensus layer receives a new block via the consensus gossip protocol,
@ -660,7 +666,7 @@ where
let status = match status {
InsertPayloadOk2::Inserted(BlockStatus2::Valid) => {
latest_valid_hash = Some(block_hash);
self.try_connect_buffered_blocks(num_hash);
self.try_connect_buffered_blocks(num_hash)?;
PayloadStatusEnum::Valid
}
InsertPayloadOk2::AlreadySeen(BlockStatus2::Valid) => {
@ -902,21 +908,27 @@ where
}
/// Handles a message from the engine.
fn on_engine_message(&mut self, msg: FromEngine<EngineApiRequest<T>>) {
fn on_engine_message(
&mut self,
msg: FromEngine<EngineApiRequest<T>>,
) -> Result<(), InsertBlockFatalError> {
match msg {
FromEngine::Event(event) => match event {
FromOrchestrator::BackfillSyncStarted => {
debug!(target: "consensus::engine", "received backfill sync started event");
self.backfill_sync_state = BackfillSyncState::Active;
Ok(())
}
FromOrchestrator::BackfillSyncFinished(ctrl) => {
self.on_backfill_sync_finished(ctrl);
self.on_backfill_sync_finished(ctrl)?;
Ok(())
}
},
FromEngine::Request(request) => {
match request {
EngineApiRequest::InsertExecutedBlock(block) => {
self.state.tree_state.insert_executed(block);
Ok(())
}
EngineApiRequest::Beacon(request) => {
match request {
@ -944,6 +956,7 @@ where
{
error!("Failed to send event: {err:?}");
}
Ok(())
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
let output = self.on_new_payload(payload, cancun_fields);
@ -954,21 +967,24 @@ where
})) {
error!("Failed to send event: {err:?}");
}
Ok(())
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
// triggering this hook will record that we received a request from
// the CL
self.canonical_in_memory_state
.on_transition_configuration_exchanged();
Ok(())
}
}
}
}
}
FromEngine::DownloadedBlocks(blocks) => {
if let Some(event) = self.on_downloaded(blocks) {
if let Some(event) = self.on_downloaded(blocks)? {
self.on_tree_event(event);
}
Ok(())
}
}
}
@ -983,7 +999,10 @@ where
/// This will also do the necessary housekeeping of the tree state, this includes:
/// - removing all blocks below the backfill height
/// - resetting the canonical in-memory state
fn on_backfill_sync_finished(&mut self, ctrl: ControlFlow) {
fn on_backfill_sync_finished(
&mut self,
ctrl: ControlFlow,
) -> Result<(), InsertBlockFatalError> {
debug!(target: "consensus::engine", "received backfill sync finished event");
self.backfill_sync_state = BackfillSyncState::Idle;
@ -992,11 +1011,11 @@ where
warn!(target: "consensus::engine", invalid_hash=?bad_block.hash(), invalid_number=?bad_block.number, "Bad block detected in unwind");
// update the `invalid_headers` cache with the new invalid header
self.state.invalid_headers.insert(*bad_block);
return
return Ok(())
}
// backfill height is the block number that the backfill finished at
let Some(backfill_height) = ctrl.block_number() else { return };
let Some(backfill_height) = ctrl.block_number() else { return Ok(()) };
// state house keeping after backfill sync
// remove all executed blocks below the backfill height
@ -1023,11 +1042,11 @@ where
// the backfill height
let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
else {
return
return Ok(())
};
if sync_target_state.finalized_block_hash.is_zero() {
// no finalized block, can't check distance
return
return Ok(())
}
// get the block number of the finalized block, if we have it
let newest_finalized = self
@ -1052,7 +1071,7 @@ where
self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
backfill_target.into(),
)));
return
return Ok(())
};
// try to close the gap by executing buffered blocks that are child blocks of the new head
@ -1385,12 +1404,15 @@ where
/// Attempts to connect any buffered blocks that are connected to the given parent hash.
#[instrument(level = "trace", skip(self), target = "engine")]
fn try_connect_buffered_blocks(&mut self, parent: BlockNumHash) {
fn try_connect_buffered_blocks(
&mut self,
parent: BlockNumHash,
) -> Result<(), InsertBlockFatalError> {
let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
if blocks.is_empty() {
// nothing to append
return
return Ok(())
}
let now = Instant::now();
@ -1410,12 +1432,14 @@ where
debug!(target: "engine", ?err, "failed to connect buffered block to tree");
if let Err(fatal) = self.on_insert_block_error(err) {
warn!(target: "engine", %fatal, "fatal error occurred while connecting buffered blocks");
return Err(fatal)
}
}
}
}
debug!(target: "engine", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
Ok(())
}
/// Attempts to recover the block's senders and then buffers it.
@ -1615,19 +1639,21 @@ where
/// - download more missing blocks
/// - try to canonicalize the target if the `block` is the tracked target (head) block.
#[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number,), target = "engine")]
fn on_downloaded_block(&mut self, block: SealedBlockWithSenders) -> Option<TreeEvent> {
fn on_downloaded_block(
&mut self,
block: SealedBlockWithSenders,
) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
let block_num_hash = block.num_hash();
let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
if self
.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_num_hash.hash)
.ok()?
.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_num_hash.hash)?
.is_some()
{
return None
return Ok(None)
}
if !self.backfill_sync_state.is_idle() {
return None
return Ok(None)
}
// try to append the block
@ -1637,12 +1663,12 @@ where
trace!(target: "engine", "appended downloaded sync target block");
// we just inserted the current sync target block, we can try to make it
// canonical
return Some(TreeEvent::TreeAction(TreeAction::MakeCanonical(
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical(
block_num_hash.hash,
)))
))))
}
trace!(target: "engine", "appended downloaded block");
self.try_connect_buffered_blocks(block_num_hash);
self.try_connect_buffered_blocks(block_num_hash)?;
}
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Disconnected {
head,
@ -1650,7 +1676,11 @@ where
})) => {
// block is not connected to the canonical head, we need to download
// its missing branch first
return self.on_disconnected_downloaded_block(block_num_hash, missing_ancestor, head)
return Ok(self.on_disconnected_downloaded_block(
block_num_hash,
missing_ancestor,
head,
))
}
Ok(InsertPayloadOk2::AlreadySeen(_)) => {
trace!(target: "engine", "downloaded block already executed");
@ -1659,10 +1689,11 @@ where
debug!(target: "engine", err=%err.kind(), "failed to insert downloaded block");
if let Err(fatal) = self.on_insert_block_error(err) {
warn!(target: "engine", %fatal, "fatal error occurred while inserting downloaded block");
return Err(fatal)
}
}
}
None
Ok(None)
}
fn insert_block_without_senders(
@ -2182,14 +2213,16 @@ mod tests {
let fcu_state = self.fcu_state(block_hash);
let (tx, rx) = oneshot::channel();
self.tree.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: fcu_state,
payload_attrs: None,
tx,
}
.into(),
));
self.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: fcu_state,
payload_attrs: None,
tx,
}
.into(),
))
.unwrap();
let response = rx.await.unwrap().unwrap().await.unwrap();
match fcu_status.into() {
@ -2379,7 +2412,7 @@ mod tests {
// process the message
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
test_harness.tree.on_engine_message(msg);
test_harness.tree.on_engine_message(msg).unwrap();
// we now should receive the other batch
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
@ -2463,18 +2496,21 @@ mod tests {
.with_backfill_state(BackfillSyncState::Active);
let (tx, rx) = oneshot::channel();
test_harness.tree.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: ForkchoiceState {
head_block_hash: B256::random(),
safe_block_hash: B256::random(),
finalized_block_hash: B256::random(),
},
payload_attrs: None,
tx,
}
.into(),
));
test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: ForkchoiceState {
head_block_hash: B256::random(),
safe_block_hash: B256::random(),
finalized_block_hash: B256::random(),
},
payload_attrs: None,
tx,
}
.into(),
))
.unwrap();
let resp = rx.await.unwrap().unwrap().await.unwrap();
assert!(resp.payload_status.is_syncing());
@ -2530,14 +2566,17 @@ mod tests {
TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
let (tx, rx) = oneshot::channel();
test_harness.tree.on_engine_message(FromEngine::Request(
BeaconEngineMessage::NewPayload {
payload: payload.clone().into(),
cancun_fields: None,
tx,
}
.into(),
));
test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::NewPayload {
payload: payload.clone().into(),
cancun_fields: None,
tx,
}
.into(),
))
.unwrap();
let resp = rx.await.unwrap().unwrap();
assert!(resp.is_syncing());
@ -2909,7 +2948,7 @@ mod tests {
block_number: MIN_BLOCKS_FOR_PIPELINE_RUN + 1,
});
test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished));
test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
@ -2919,10 +2958,13 @@ mod tests {
_ => panic!("Unexpected event: {:#?}", event),
}
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
.last()
.unwrap()
.clone()]));
test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
.last()
.unwrap()
.clone()]))
.unwrap();
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
@ -2977,9 +3019,12 @@ mod tests {
}
// send message to tell the engine the requested block was downloaded
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(vec![
main_chain_backfill_target.clone(),
]));
test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![
main_chain_backfill_target.clone()
]))
.unwrap();
// check that backfill is triggered
let event = test_harness.from_tree_rx.recv().await.unwrap();
@ -3000,11 +3045,12 @@ mod tests {
test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
// send message to mark backfill finished
test_harness.tree.on_engine_message(FromEngine::Event(
FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
block_number: main_chain_backfill_target.number,
}),
));
test_harness
.tree
.on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
ControlFlow::Continue { block_number: main_chain_backfill_target.number },
)))
.unwrap();
// send fcu to the tip of main
test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
@ -3020,7 +3066,8 @@ mod tests {
// tell engine main chain tip downloaded
test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]));
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
.unwrap();
// check download range request
let event = test_harness.from_tree_rx.recv().await.unwrap();
@ -3043,7 +3090,10 @@ mod tests {
test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
// tell engine block range downloaded
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()));
test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
.unwrap();
test_harness.check_fork_chain_insertion(remaining).await;