fix(engine): pipeline run conditions (#2193)

This commit is contained in:
Roman Krasiuk
2023-04-12 09:42:01 +03:00
committed by GitHub
parent c5b70f4771
commit e87960ea8d

View File

@ -8,7 +8,7 @@ use reth_interfaces::{
Error, Error,
}; };
use reth_miner::PayloadStore; use reth_miner::PayloadStore;
use reth_primitives::{BlockHash, BlockNumber, Header, SealedBlock, H256}; use reth_primitives::{BlockNumber, Header, SealedBlock, H256};
use reth_rpc_types::engine::{ use reth_rpc_types::engine::{
EngineRpcError, ExecutionPayload, ExecutionPayloadEnvelope, ForkchoiceUpdated, EngineRpcError, ExecutionPayload, ExecutionPayloadEnvelope, ForkchoiceUpdated,
PayloadAttributes, PayloadId, PayloadStatus, PayloadStatusEnum, PayloadAttributes, PayloadId, PayloadStatus, PayloadStatusEnum,
@ -319,11 +319,6 @@ where
.then_some(H256::zero()); .then_some(H256::zero());
let status = match error { let status = match error {
Error::Execution(ExecutorError::PendingBlockIsInFuture { .. }) => { Error::Execution(ExecutorError::PendingBlockIsInFuture { .. }) => {
if let Some(state) = self.forkchoice_state {
if self.get_block_number(state.head_block_hash)?.is_none() {
self.require_pipeline_run(PipelineTarget::Head);
}
}
PayloadStatusEnum::Syncing PayloadStatusEnum::Syncing
} }
error => PayloadStatusEnum::Invalid { validation_error: error.to_string() }, error => PayloadStatusEnum::Invalid { validation_error: error.to_string() },
@ -371,12 +366,23 @@ where
/// If the finalized block is missing from the database, trigger the pipeline run. /// If the finalized block is missing from the database, trigger the pipeline run.
fn restore_tree_if_possible( fn restore_tree_if_possible(
&mut self, &mut self,
finalized_hash: BlockHash, state: ForkchoiceState,
) -> Result<(), reth_interfaces::Error> { ) -> Result<(), reth_interfaces::Error> {
match self.get_block_number(finalized_hash)? { let needs_pipeline_run = match self.get_block_number(state.finalized_block_hash)? {
Some(number) => self.blockchain_tree.restore_canonical_hashes(number)?, Some(number) => {
None => self.require_pipeline_run(PipelineTarget::Head), // Attempt to restore the tree.
self.blockchain_tree.restore_canonical_hashes(number)?;
// After restoring the tree, check if the head block is missing.
self.db
.view(|tx| tx.get::<tables::HeaderNumbers>(state.head_block_hash))??
.is_none()
}
None => true,
}; };
if needs_pipeline_run {
self.require_pipeline_run(PipelineTarget::Head);
}
Ok(()) Ok(())
} }
@ -510,9 +516,7 @@ where
}; };
// Update the state and hashes of the blockchain tree if possible // Update the state and hashes of the blockchain tree if possible
if let Err(error) = if let Err(error) = this.restore_tree_if_possible(forkchoice_state) {
this.restore_tree_if_possible(forkchoice_state.finalized_block_hash)
{
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree"); error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");
return Poll::Ready(Err(error.into())) return Poll::Ready(Err(error.into()))
} }
@ -917,7 +921,6 @@ mod tests {
VecDeque::from([ VecDeque::from([
Ok(ExecOutput { done: true, stage_progress: 0 }), Ok(ExecOutput { done: true, stage_progress: 0 }),
Ok(ExecOutput { done: true, stage_progress: 0 }), Ok(ExecOutput { done: true, stage_progress: 0 }),
Ok(ExecOutput { done: true, stage_progress: 0 }),
]), ]),
Vec::default(), Vec::default(),
); );
@ -928,27 +931,25 @@ mod tests {
let mut engine_rx = spawn_consensus_engine(consensus_engine); let mut engine_rx = spawn_consensus_engine(consensus_engine);
let invalid_forkchoice_state = ForkchoiceState { let next_head = random_block(2, Some(block1.hash), None, Some(0));
head_block_hash: H256::random(), let next_forkchoice_state = ForkchoiceState {
head_block_hash: next_head.hash,
finalized_block_hash: block1.hash, finalized_block_hash: block1.hash,
..Default::default() ..Default::default()
}; };
let rx = env.send_forkchoice_updated(invalid_forkchoice_state); let invalid_rx = env.send_forkchoice_updated(next_forkchoice_state);
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
let rx = env.send_forkchoice_updated(invalid_forkchoice_state); // Insert next head immediately after sending forkchoice update
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); insert_blocks(env.db.as_ref(), [&next_head].into_iter());
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
let rx_valid = env.send_forkchoice_updated(ForkchoiceState {
head_block_hash: H256::random(),
finalized_block_hash: block1.hash,
..Default::default()
});
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
assert_matches!(rx_valid.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); assert_matches!(invalid_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
let valid_rx = env.send_forkchoice_updated(next_forkchoice_state);
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid)
.with_latest_valid_hash(next_head.hash);
assert_matches!(valid_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
} }
@ -972,7 +973,7 @@ mod tests {
let block1 = random_block(1, Some(genesis.hash), None, Some(0)); let block1 = random_block(1, Some(genesis.hash), None, Some(0));
insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter());
let _engine = spawn_consensus_engine(consensus_engine); let engine = spawn_consensus_engine(consensus_engine);
let rx = env.send_forkchoice_updated(ForkchoiceState { let rx = env.send_forkchoice_updated(ForkchoiceState {
head_block_hash: H256::random(), head_block_hash: H256::random(),
@ -981,7 +982,7 @@ mod tests {
}); });
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
drop(_engine); drop(engine);
} }
#[tokio::test] #[tokio::test]
@ -1007,10 +1008,10 @@ mod tests {
insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter());
let _engine = spawn_consensus_engine(consensus_engine); let engine = spawn_consensus_engine(consensus_engine);
let rx = env.send_forkchoice_updated(ForkchoiceState { let rx = env.send_forkchoice_updated(ForkchoiceState {
head_block_hash: H256::random(), head_block_hash: block1.hash,
finalized_block_hash: block1.hash, finalized_block_hash: block1.hash,
..Default::default() ..Default::default()
}); });
@ -1027,7 +1028,7 @@ mod tests {
}) })
.with_latest_valid_hash(H256::zero()); .with_latest_valid_hash(H256::zero());
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
drop(_engine); drop(engine);
} }
} }