chore(stages): remove execute_inner method (#8719)

This commit is contained in:
Alexey Shekhirin
2024-06-10 13:21:11 +01:00
committed by GitHub
parent 8717ed757b
commit e5832d5189

View File

@ -163,12 +163,28 @@ impl<E> ExecutionStage<E> {
}
}
impl<E> ExecutionStage<E>
impl<E, DB> Stage<DB> for ExecutionStage<E>
where
DB: Database,
E: BlockExecutorProvider,
{
/// Execute the stage.
pub fn execute_inner<DB: Database>(
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::Execution
}
fn poll_execute_ready(
&mut self,
cx: &mut Context<'_>,
_: ExecInput,
) -> Poll<Result<(), StageError>> {
ready!(self.exex_manager_handle.poll_ready(cx));
Poll::Ready(Ok(()))
}
/// Execute the stage
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
@ -321,6 +337,73 @@ where
done,
})
}
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) =
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if range.is_empty() {
return Ok(UnwindOutput {
checkpoint: input.checkpoint.with_block_number(input.unwind_to),
})
}
// Unwind account and storage changesets, as well as receipts.
//
// This also updates `PlainStorageState` and `PlainAccountState`.
let bundle_state_with_receipts = provider.unwind_or_peek_state::<true>(range.clone())?;
// Construct a `ExExNotification` if we have ExEx's installed.
if self.exex_manager_handle.has_exexs() {
// Get the blocks for the unwound range. This is needed for `ExExNotification`.
let blocks = provider.get_take_block_range::<false>(range.clone())?;
let chain = Chain::new(blocks, bundle_state_with_receipts, None);
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self
.exex_manager_handle
.send(ExExNotification::ChainReverted { old: Arc::new(chain) });
}
// Unwind all receipts for transactions in the block range
if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() {
// We only use static files for Receipts, if there is no receipt pruning of any kind.
// prepare_static_file_producer does a consistency check that will unwind static files
// if the expected highest receipt in the files is higher than the database.
// Which is essentially what happens here when we unwind this stage.
let _static_file_producer = prepare_static_file_producer(provider, *range.start())?;
} else {
// If there is any kind of receipt pruning/filtering we use the database, since static
// files do not support filters.
//
// If we hit this case, the receipts have already been unwound by the call to
// `unwind_or_peek_state`.
}
// Update the checkpoint.
let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint();
if let Some(stage_checkpoint) = stage_checkpoint.as_mut() {
for block_number in range {
stage_checkpoint.progress.processed -= provider
.block_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?
.gas_used;
}
}
let checkpoint = if let Some(stage_checkpoint) = stage_checkpoint {
StageCheckpoint::new(unwind_to).with_execution_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_to)
};
Ok(UnwindOutput { checkpoint })
}
}
fn execution_checkpoint(
@ -413,103 +496,6 @@ fn calculate_gas_used_from_headers(
Ok(gas_total)
}
impl<E, DB> Stage<DB> for ExecutionStage<E>
where
DB: Database,
E: BlockExecutorProvider,
{
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::Execution
}
fn poll_execute_ready(
&mut self,
cx: &mut Context<'_>,
_: ExecInput,
) -> Poll<Result<(), StageError>> {
ready!(self.exex_manager_handle.poll_ready(cx));
Poll::Ready(Ok(()))
}
/// Execute the stage
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.execute_inner(provider, input)
}
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) =
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if range.is_empty() {
return Ok(UnwindOutput {
checkpoint: input.checkpoint.with_block_number(input.unwind_to),
})
}
// Unwind account and storage changesets, as well as receipts.
//
// This also updates `PlainStorageState` and `PlainAccountState`.
let bundle_state_with_receipts = provider.unwind_or_peek_state::<true>(range.clone())?;
// Construct a `ExExNotification` if we have ExEx's installed.
if self.exex_manager_handle.has_exexs() {
// Get the blocks for the unwound range. This is needed for `ExExNotification`.
let blocks = provider.get_take_block_range::<false>(range.clone())?;
let chain = Chain::new(blocks, bundle_state_with_receipts, None);
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self
.exex_manager_handle
.send(ExExNotification::ChainReverted { old: Arc::new(chain) });
}
// Unwind all receipts for transactions in the block range
if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() {
// We only use static files for Receipts, if there is no receipt pruning of any kind.
// prepare_static_file_producer does a consistency check that will unwind static files
// if the expected highest receipt in the files is higher than the database.
// Which is essentially what happens here when we unwind this stage.
let _static_file_producer = prepare_static_file_producer(provider, *range.start())?;
} else {
// If there is any kind of receipt pruning/filtering we use the database, since static
// files do not support filters.
//
// If we hit this case, the receipts have already been unwound by the call to
// `unwind_or_peek_state`.
}
// Update the checkpoint.
let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint();
if let Some(stage_checkpoint) = stage_checkpoint.as_mut() {
for block_number in range {
stage_checkpoint.progress.processed -= provider
.block_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?
.gas_used;
}
}
let checkpoint = if let Some(stage_checkpoint) = stage_checkpoint {
StageCheckpoint::new(unwind_to).with_execution_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_to)
};
Ok(UnwindOutput { checkpoint })
}
}
/// The thresholds at which the execution stage writes state changes to the database.
///
/// If either of the thresholds (`max_blocks` and `max_changes`) are hit, then the execution stage