feat(debug): engine reorg util depth (#10575)

This commit is contained in:
Roman Krasiuk
2024-08-28 02:22:51 -07:00
committed by GitHub
parent 5b7d637426
commit f600ff07d9
6 changed files with 114 additions and 79 deletions

View File

@ -101,11 +101,19 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: usize,
depth: Option<usize>,
) -> EngineReorg<Self, Engine, Provider, Evm>
where
Self: Sized,
{
EngineReorg::new(self, provider, evm_config, payload_validator, frequency)
EngineReorg::new(
self,
provider,
evm_config,
payload_validator,
frequency,
depth.unwrap_or_default(),
)
}
/// If frequency is [Some], returns the stream that creates reorgs with
@ -116,6 +124,7 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: Option<usize>,
depth: Option<usize>,
) -> Either<EngineReorg<Self, Engine, Provider, Evm>, Self>
where
Self: Sized,
@ -127,6 +136,7 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
evm_config,
payload_validator,
frequency,
depth.unwrap_or_default(),
))
} else {
Either::Right(self)

View File

@ -61,6 +61,8 @@ pub struct EngineReorg<S, Engine: EngineTypes, Provider, Evm> {
payload_validator: ExecutionPayloadValidator,
/// The frequency of reorgs.
frequency: usize,
/// The depth of reorgs.
depth: usize,
/// The number of forwarded forkchoice states.
/// This is reset after a reorg.
forkchoice_states_forwarded: usize,
@ -80,6 +82,7 @@ impl<S, Engine: EngineTypes, Provider, Evm> EngineReorg<S, Engine, Provider, Evm
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: usize,
depth: usize,
) -> Self {
Self {
stream,
@ -87,6 +90,7 @@ impl<S, Engine: EngineTypes, Provider, Evm> EngineReorg<S, Engine, Provider, Evm
evm_config,
payload_validator,
frequency,
depth,
state: EngineReorgState::Forward,
forkchoice_states_forwarded: 0,
last_forkchoice_state: None,
@ -138,80 +142,76 @@ where
}
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }) => {
if this.forkchoice_states_forwarded > this.frequency {
if let Some(last_forkchoice_state) = this
.last_forkchoice_state
// Only enter reorg state if new payload attaches to current head.
.filter(|state| state.head_block_hash == payload.parent_hash())
{
// Enter the reorg state.
// The current payload will be immediately forwarded by being in front
// of the queue. Then we attempt to reorg the current head by generating
// a payload that attaches to the head's parent and is based on the
// non-conflicting transactions (txs from block `n + 1` that are valid
// at block `n` according to consensus checks) from the current payload
// as well as the corresponding forkchoice state. We will rely on CL to
// reorg us back to canonical chain.
// TODO: This is an expensive blocking operation, ideally it's spawned
// as a task so that the stream could yield the control back.
let (reorg_payload, reorg_cancun_fields) = match create_reorg_head(
this.provider,
this.evm_config,
this.payload_validator,
payload.clone(),
cancun_fields.clone(),
) {
Ok(result) => result,
Err(error) => {
error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
// Forward the payload and attempt to create reorg on top of the
// next one
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
payload,
cancun_fields,
tx,
}))
}
};
let reorg_forkchoice_state = ForkchoiceState {
finalized_block_hash: last_forkchoice_state.finalized_block_hash,
safe_block_hash: last_forkchoice_state.safe_block_hash,
head_block_hash: reorg_payload.block_hash(),
};
let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
this.reorg_responses.extend([
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
]);
*this.state = EngineReorgState::Reorg {
queue: VecDeque::from([
// Current payload
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx },
// Reorg payload
BeaconEngineMessage::NewPayload {
payload: reorg_payload,
cancun_fields: reorg_cancun_fields,
tx: reorg_payload_tx,
},
// Reorg forkchoice state
BeaconEngineMessage::ForkchoiceUpdated {
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
},
]),
};
continue
let item = match (next, &this.last_forkchoice_state) {
(
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }),
Some(last_forkchoice_state),
) if this.forkchoice_states_forwarded > this.frequency &&
// Only enter reorg state if new payload attaches to current head.
last_forkchoice_state.head_block_hash == payload.parent_hash() =>
{
// Enter the reorg state.
// The current payload will be immediately forwarded by being in front of the
// queue. Then we attempt to reorg the current head by generating a payload that
// attaches to the head's parent and is based on the non-conflicting
// transactions (txs from block `n + 1` that are valid at block `n` according to
// consensus checks) from the current payload as well as the corresponding
// forkchoice state. We will rely on CL to reorg us back to canonical chain.
// TODO: This is an expensive blocking operation, ideally it's spawned as a task
// so that the stream could yield the control back.
let (reorg_payload, reorg_cancun_fields) = match create_reorg_head(
this.provider,
this.evm_config,
this.payload_validator,
*this.depth,
payload.clone(),
cancun_fields.clone(),
) {
Ok(result) => result,
Err(error) => {
error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
// Forward the payload and attempt to create reorg on top of
// the next one
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
payload,
cancun_fields,
tx,
}))
}
}
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx })
};
let reorg_forkchoice_state = ForkchoiceState {
finalized_block_hash: last_forkchoice_state.finalized_block_hash,
safe_block_hash: last_forkchoice_state.safe_block_hash,
head_block_hash: reorg_payload.block_hash(),
};
let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
this.reorg_responses.extend([
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
]);
let queue = VecDeque::from([
// Current payload
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx },
// Reorg payload
BeaconEngineMessage::NewPayload {
payload: reorg_payload,
cancun_fields: reorg_cancun_fields,
tx: reorg_payload_tx,
},
// Reorg forkchoice state
BeaconEngineMessage::ForkchoiceUpdated {
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
},
]);
*this.state = EngineReorgState::Reorg { queue };
continue
}
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => {
(Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }), _) => {
// Record last forkchoice state forwarded to the engine.
// We do not care if it's valid since engine should be able to handle
// reorgs that rely on invalid forkchoice state.
@ -219,7 +219,7 @@ where
*this.forkchoice_states_forwarded += 1;
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
}
item => item,
(item, _) => item,
};
return Poll::Ready(item)
}
@ -230,6 +230,7 @@ fn create_reorg_head<Provider, Evm>(
provider: &Provider,
evm_config: &Evm,
payload_validator: &ExecutionPayloadValidator,
mut depth: usize,
next_payload: ExecutionPayload,
next_cancun_fields: Option<CancunPayloadFields>,
) -> RethResult<(ExecutionPayload, Option<CancunPayloadFields>)>
@ -244,14 +245,29 @@ where
.ensure_well_formed_payload(next_payload, next_cancun_fields.into())
.map_err(RethError::msg)?;
// Fetch reorg target block and its parent
let reorg_target = provider
.block_by_hash(next_block.parent_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(next_block.parent_hash.into()))?;
// Fetch reorg target block depending on its depth and its parent.
let mut previous_hash = next_block.parent_hash;
let mut candidate_transactions = next_block.body;
let reorg_target = 'target: {
loop {
let reorg_target = provider
.block_by_hash(previous_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
if depth == 0 {
break 'target reorg_target
}
depth -= 1;
previous_hash = reorg_target.parent_hash;
candidate_transactions = reorg_target.body;
}
};
let reorg_target_parent = provider
.block_by_hash(reorg_target.parent_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.parent_hash.into()))?;
debug!(target: "engine::stream::reorg", number = reorg_target.number, hash = %previous_hash, "Selected reorg target");
// Configure state
let state_provider = provider.state_by_block_hash(reorg_target.parent_hash)?;
let mut state = State::builder()
@ -287,7 +303,7 @@ where
let mut transactions = Vec::new();
let mut receipts = Vec::new();
let mut versioned_hashes = Vec::new();
for tx in next_block.body {
for tx in candidate_transactions {
// ensure we still have capacity for this transaction
if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit {
continue