feat(evm, root): pass state change source to state hook (#14494)

This commit is contained in:
Alexey Shekhirin
2025-02-14 17:04:23 +00:00
committed by GitHub
parent ab4b1764ad
commit b6198b1f12
6 changed files with 119 additions and 42 deletions

View File

@ -7,7 +7,7 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use proptest::test_runner::TestRunner;
use rand::Rng;
use reth_engine_tree::tree::root::{StateRootConfig, StateRootTask};
use reth_evm::system_calls::OnStateHook;
use reth_evm::system_calls::{OnStateHook, StateChangeSource};
use reth_primitives_traits::{Account as RethAccount, StorageEntry};
use reth_provider::{
providers::ConsistentDbView,
@ -225,8 +225,8 @@ fn bench_state_root(c: &mut Criterion) {
let mut hook = task.state_hook();
let handle = task.spawn();
for update in state_updates {
hook.on_state(&update)
for (i, update) in state_updates.into_iter().enumerate() {
hook.on_state(StateChangeSource::Transaction(i), &update)
}
drop(hook);

View File

@ -5,7 +5,7 @@ use derive_more::derive::Deref;
use metrics::Histogram;
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_errors::{ProviderError, ProviderResult};
use reth_evm::system_calls::OnStateHook;
use reth_evm::system_calls::{OnStateHook, StateChangeSource};
use reth_metrics::Metrics;
use reth_provider::{
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory,
@ -146,8 +146,8 @@ impl<Factory> StateRootConfig<Factory> {
pub enum StateRootMessage {
/// Prefetch proof targets
PrefetchProofs(MultiProofTargets),
/// New state update from transaction execution
StateUpdate(EvmState),
/// New state update from transaction execution with its source
StateUpdate(StateChangeSource, EvmState),
/// Empty proof for a specific state update
EmptyProof {
/// The index of this proof in the sequence of state updates
@ -316,6 +316,7 @@ fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
#[derive(Debug)]
struct MultiproofInput<Factory> {
config: StateRootConfig<Factory>,
source: Option<StateChangeSource>,
hashed_state_update: HashedPostState,
proof_targets: MultiProofTargets,
proof_sequence_number: u64,
@ -391,14 +392,17 @@ where
}
/// Spawns a multiproof calculation.
fn spawn_multiproof(&mut self, input: MultiproofInput<Factory>) {
let MultiproofInput {
fn spawn_multiproof(
&mut self,
MultiproofInput {
config,
source,
hashed_state_update,
proof_targets,
proof_sequence_number,
state_root_message_sender,
} = input;
}: MultiproofInput<Factory>,
) {
let thread_pool = self.thread_pool.clone();
self.thread_pool.spawn(move || {
@ -420,6 +424,7 @@ where
target: "engine::root",
proof_sequence_number,
?elapsed,
?source,
account_targets,
storage_targets,
"Multiproof calculated",
@ -536,8 +541,10 @@ where
pub fn state_hook(&self) -> impl OnStateHook {
let state_hook = self.state_hook_sender();
move |state: &EvmState| {
if let Err(error) = state_hook.send(StateRootMessage::StateUpdate(state.clone())) {
move |source: StateChangeSource, state: &EvmState| {
if let Err(error) =
state_hook.send(StateRootMessage::StateUpdate(source, state.clone()))
{
error!(target: "engine::root", ?error, "Failed to send state update");
}
}
@ -595,6 +602,7 @@ where
self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
@ -655,13 +663,19 @@ where
/// Handles state updates.
///
/// Returns proof targets derived from the state update.
fn on_state_update(&mut self, update: EvmState, proof_sequence_number: u64) {
fn on_state_update(
&mut self,
source: StateChangeSource,
update: EvmState,
proof_sequence_number: u64,
) {
let hashed_state_update = evm_state_to_hashed_post_state(update);
let proof_targets = get_proof_targets(&hashed_state_update, &self.fetched_proof_targets);
extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets);
self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number,
@ -759,7 +773,7 @@ where
);
self.on_prefetch_proof(targets);
}
StateRootMessage::StateUpdate(update) => {
StateRootMessage::StateUpdate(source, update) => {
trace!(target: "engine::root", "processing StateRootMessage::StateUpdate");
if updates_received == 0 {
first_update_time = Some(Instant::now());
@ -770,12 +784,13 @@ where
updates_received += 1;
debug!(
target: "engine::root",
?source,
len = update.len(),
total_updates = updates_received,
"Received new state update"
);
let next_sequence = self.proof_sequencer.next_sequence();
self.on_state_update(update, next_sequence);
self.on_state_update(source, update, next_sequence);
}
StateRootMessage::FinishedStateUpdates => {
trace!(target: "engine::root", "processing StateRootMessage::FinishedStateUpdates");
@ -1168,6 +1183,7 @@ fn extend_multi_proof_targets_ref(targets: &mut MultiProofTargets, other: &Multi
mod tests {
use super::*;
use alloy_primitives::map::B256Set;
use reth_evm::system_calls::StateChangeSource;
use reth_primitives_traits::{Account as RethAccount, StorageEntry};
use reth_provider::{
providers::ConsistentDbView, test_utils::create_test_provider_factory, HashingWriter,
@ -1344,8 +1360,8 @@ mod tests {
let mut state_hook = task.state_hook();
let handle = task.spawn();
for update in state_updates {
state_hook.on_state(&update);
for (i, update) in state_updates.into_iter().enumerate() {
state_hook.on_state(StateChangeSource::Transaction(i), &update);
}
drop(state_hook);

View File

@ -16,7 +16,7 @@ use reth_evm::{
BlockExecutionStrategy, BlockExecutionStrategyFactory, BlockValidationError, ExecuteOutput,
},
state_change::post_block_balance_increments,
system_calls::{OnStateHook, SystemCaller},
system_calls::{OnStateHook, StateChangePostBlockSource, StateChangeSource, SystemCaller},
ConfigureEvm, Database, Evm,
};
use reth_primitives::{EthPrimitives, Receipt, RecoveredBlock};
@ -140,7 +140,7 @@ where
let mut cumulative_gas_used = 0;
let mut receipts = Vec::with_capacity(block.body().transaction_count());
for (sender, transaction) in block.transactions_with_sender() {
for (tx_index, (sender, transaction)) in block.transactions_with_sender().enumerate() {
// The sum of the transaction's gas limit, Tg, and the gas utilized in this block prior,
// must be no greater than the block's gasLimit.
let block_available_gas = block.gas_limit() - cumulative_gas_used;
@ -162,7 +162,8 @@ where
error: Box::new(err),
}
})?;
self.system_caller.on_state(&result_and_state.state);
self.system_caller
.on_state(StateChangeSource::Transaction(tx_index), &result_and_state.state);
let ResultAndState { result, state } = result_and_state;
evm.db_mut().commit(state);
@ -228,7 +229,10 @@ where
.map_err(|_| BlockValidationError::IncrementBalanceFailed)?;
// call state hook with changes due to balance increments.
let balance_state = balance_increment_state(&balance_increments, &mut self.state)?;
self.system_caller.on_state(&balance_state);
self.system_caller.on_state(
StateChangeSource::PostBlock(StateChangePostBlockSource::BalanceIncrements),
&balance_state,
);
Ok(requests)
}
@ -1131,7 +1135,7 @@ mod tests {
let tx_clone = tx.clone();
let _output = executor
.execute_with_state_hook(block, move |state: &EvmState| {
.execute_with_state_hook(block, move |_, state: &EvmState| {
if let Some(account) = state.get(&withdrawal_recipient) {
let _ = tx_clone.send(account.info.balance);
}

View File

@ -2,7 +2,11 @@
//!
//! Block processing related to syncing should take care to update the metrics by using either
//! [`ExecutorMetrics::execute_metered`] or [`ExecutorMetrics::metered_one`].
use crate::{execute::Executor, system_calls::OnStateHook, Database};
use crate::{
execute::Executor,
system_calls::{OnStateHook, StateChangeSource},
Database,
};
use alloy_consensus::BlockHeader;
use metrics::{Counter, Gauge, Histogram};
use reth_execution_types::BlockExecutionOutput;
@ -18,7 +22,7 @@ struct MeteredStateHook {
}
impl OnStateHook for MeteredStateHook {
fn on_state(&mut self, state: &EvmState) {
fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
// Update the metrics for the number of accounts, storage slots and bytecodes loaded
let accounts = state.keys().len();
let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
@ -33,7 +37,7 @@ impl OnStateHook for MeteredStateHook {
self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
// Call the original state hook
self.inner_hook.on_state(state);
self.inner_hook.on_state(source, state);
}
}
@ -178,7 +182,7 @@ mod tests {
F: OnStateHook + 'static,
{
// Call hook with our mock state
hook.on_state(&self.state);
hook.on_state(StateChangeSource::Transaction(0), &self.state);
Ok(BlockExecutionResult {
receipts: vec![],
@ -202,7 +206,7 @@ mod tests {
}
impl OnStateHook for ChannelStateHook {
fn on_state(&mut self, _state: &EvmState) {
fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
let _ = self.sender.send(self.output);
}
}

View File

@ -20,16 +20,49 @@ mod eip7251;
/// A hook that is called after each state change.
pub trait OnStateHook {
/// Invoked with the state after each system call.
fn on_state(&mut self, state: &EvmState);
/// Invoked with the source of the change and the state after each system call.
fn on_state(&mut self, source: StateChangeSource, state: &EvmState);
}
/// Source of the state change
#[derive(Debug, Clone, Copy)]
pub enum StateChangeSource {
/// Transaction with its index
Transaction(usize),
/// Pre-block state transition
PreBlock(StateChangePreBlockSource),
/// Post-block state transition
PostBlock(StateChangePostBlockSource),
}
/// Source of the pre-block state change
#[derive(Debug, Clone, Copy)]
pub enum StateChangePreBlockSource {
/// EIP-2935 blockhashes contract
BlockHashesContract,
/// EIP-4788 beacon root contract
BeaconRootContract,
/// EIP-7002 withdrawal requests contract
WithdrawalRequestsContract,
}
/// Source of the post-block state change
#[derive(Debug, Clone, Copy)]
pub enum StateChangePostBlockSource {
/// Balance increments from block rewards and withdrawals
BalanceIncrements,
/// EIP-7002 withdrawal requests contract
WithdrawalRequestsContract,
/// EIP-7251 consolidation requests contract
ConsolidationRequestsContract,
}
impl<F> OnStateHook for F
where
F: FnMut(&EvmState),
F: FnMut(StateChangeSource, &EvmState),
{
fn on_state(&mut self, state: &EvmState) {
self(state)
fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
self(source, state)
}
}
@ -39,7 +72,7 @@ where
pub struct NoopHook;
impl OnStateHook for NoopHook {
fn on_state(&mut self, _state: &EvmState) {}
fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {}
}
/// An ephemeral helper type for executing system calls.
@ -161,7 +194,10 @@ where
if let Some(res) = result_and_state {
if let Some(ref mut hook) = self.hook {
hook.on_state(&res.state);
hook.on_state(
StateChangeSource::PreBlock(StateChangePreBlockSource::BlockHashesContract),
&res.state,
);
}
evm.db_mut().commit(res.state);
}
@ -211,7 +247,10 @@ where
if let Some(res) = result_and_state {
if let Some(ref mut hook) = self.hook {
hook.on_state(&res.state);
hook.on_state(
StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract),
&res.state,
);
}
evm.db_mut().commit(res.state);
}
@ -245,7 +284,12 @@ where
let result_and_state = eip7002::transact_withdrawal_requests_contract_call(evm)?;
if let Some(ref mut hook) = self.hook {
hook.on_state(&result_and_state.state);
hook.on_state(
StateChangeSource::PostBlock(
StateChangePostBlockSource::WithdrawalRequestsContract,
),
&result_and_state.state,
);
}
evm.db_mut().commit(result_and_state.state);
@ -277,7 +321,12 @@ where
let result_and_state = eip7251::transact_consolidation_requests_contract_call(evm)?;
if let Some(ref mut hook) = self.hook {
hook.on_state(&result_and_state.state);
hook.on_state(
StateChangeSource::PostBlock(
StateChangePostBlockSource::ConsolidationRequestsContract,
),
&result_and_state.state,
);
}
evm.db_mut().commit(result_and_state.state);
@ -285,9 +334,9 @@ where
}
/// Delegate to stored `OnStateHook`, noop if hook is `None`.
pub fn on_state(&mut self, state: &EvmState) {
pub fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
if let Some(ref mut hook) = &mut self.hook {
hook.on_state(state);
hook.on_state(source, state);
}
}
}

View File

@ -15,7 +15,7 @@ use reth_evm::{
BlockExecutionStrategy, BlockExecutionStrategyFactory, BlockValidationError, ExecuteOutput,
},
state_change::post_block_balance_increments,
system_calls::{OnStateHook, SystemCaller},
system_calls::{OnStateHook, StateChangePostBlockSource, StateChangeSource, SystemCaller},
ConfigureEvmFor, Database, Evm,
};
use reth_optimism_chainspec::OpChainSpec;
@ -175,7 +175,7 @@ where
let mut cumulative_gas_used = 0;
let mut receipts = Vec::with_capacity(block.body().transaction_count());
for (sender, transaction) in block.transactions_with_sender() {
for (tx_index, (sender, transaction)) in block.transactions_with_sender().enumerate() {
// The sum of the transactions gas limit, Tg, and the gas utilized in this block prior,
// must be no greater than the blocks gasLimit.
let block_available_gas = block.gas_limit() - cumulative_gas_used;
@ -219,7 +219,8 @@ where
?transaction,
"Executed transaction"
);
self.system_caller.on_state(&result_and_state.state);
self.system_caller
.on_state(StateChangeSource::Transaction(tx_index), &result_and_state.state);
let ResultAndState { result, state } = result_and_state;
evm.db_mut().commit(state);
@ -275,7 +276,10 @@ where
.map_err(|_| BlockValidationError::IncrementBalanceFailed)?;
// call state hook with changes due to balance increments.
let balance_state = balance_increment_state(&balance_increments, &mut self.state)?;
self.system_caller.on_state(&balance_state);
self.system_caller.on_state(
StateChangeSource::PostBlock(StateChangePostBlockSource::BalanceIncrements),
&balance_state,
);
Ok(Requests::default())
}