feat(root): support proof prefetch in the task (#13428)

Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
This commit is contained in:
Roman Krasiuk
2024-12-17 18:02:46 +01:00
committed by GitHub
parent 48fee88cf0
commit 1e402fae87

View File

@ -1,9 +1,9 @@
//! State root task related functionality. //! State root task related functionality.
use alloy_primitives::map::HashSet; use alloy_primitives::{map::HashSet, Address};
use derive_more::derive::Deref; use derive_more::derive::Deref;
use rayon::iter::{ParallelBridge, ParallelIterator}; use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_errors::ProviderError; use reth_errors::{ProviderError, ProviderResult};
use reth_evm::system_calls::OnStateHook; use reth_evm::system_calls::OnStateHook;
use reth_provider::{ use reth_provider::{
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory,
@ -108,6 +108,8 @@ impl<Factory> StateRootConfig<Factory> {
#[derive(Debug)] #[derive(Debug)]
#[allow(dead_code)] #[allow(dead_code)]
pub enum StateRootMessage<BPF: BlindedProviderFactory> { pub enum StateRootMessage<BPF: BlindedProviderFactory> {
/// Prefetch proof targets
PrefetchProofs(HashSet<Address>),
/// New state update from transaction execution /// New state update from transaction execution
StateUpdate(EvmState), StateUpdate(EvmState),
/// Proof calculation completed for a specific state update /// Proof calculation completed for a specific state update
@ -340,6 +342,29 @@ where
} }
} }
/// Handles request for proof prefetch.
fn on_prefetch_proof(
scope: &rayon::Scope<'env>,
config: StateRootConfig<Factory>,
targets: HashSet<Address>,
fetched_proof_targets: &mut MultiProofTargets,
proof_sequence_number: u64,
state_root_message_sender: Sender<StateRootMessage<BPF>>,
) {
let proof_targets =
targets.into_iter().map(|address| (keccak256(address), Default::default())).collect();
extend_multi_proof_targets_ref(fetched_proof_targets, &proof_targets);
Self::spawn_multiproof(
scope,
config,
Default::default(),
proof_targets,
proof_sequence_number,
state_root_message_sender,
);
}
/// Handles state updates. /// Handles state updates.
/// ///
/// Returns proof targets derived from the state update. /// Returns proof targets derived from the state update.
@ -356,46 +381,39 @@ where
let proof_targets = get_proof_targets(&hashed_state_update, fetched_proof_targets); let proof_targets = get_proof_targets(&hashed_state_update, fetched_proof_targets);
extend_multi_proof_targets_ref(fetched_proof_targets, &proof_targets); extend_multi_proof_targets_ref(fetched_proof_targets, &proof_targets);
// Dispatch proof gathering for this state update Self::spawn_multiproof(
scope.spawn(move |_| { scope,
let provider = match config.consistent_view.provider_ro() { config,
Ok(provider) => provider, hashed_state_update,
Err(error) => { proof_targets,
error!(target: "engine::root", ?error, "Could not get provider"); proof_sequence_number,
let _ = state_root_message_sender state_root_message_sender,
.send(StateRootMessage::ProofCalculationError(error)); );
return; }
}
};
// TODO: replace with parallel proof fn spawn_multiproof(
let result = Proof::from_tx(provider.tx_ref()) scope: &rayon::Scope<'env>,
.with_trie_cursor_factory(InMemoryTrieCursorFactory::new( config: StateRootConfig<Factory>,
DatabaseTrieCursorFactory::new(provider.tx_ref()), hashed_state_update: HashedPostState,
&config.nodes_sorted, proof_targets: MultiProofTargets,
)) proof_sequence_number: u64,
.with_hashed_cursor_factory(HashedPostStateCursorFactory::new( state_root_message_sender: Sender<StateRootMessage<BPF>>,
DatabaseHashedCursorFactory::new(provider.tx_ref()), ) {
&config.state_sorted, // Dispatch proof gathering for this state update
)) scope.spawn(move |_| match calculate_multiproof(config, proof_targets.clone()) {
.with_prefix_sets_mut(config.prefix_sets.as_ref().clone()) Ok(proof) => {
.with_branch_node_hash_masks(true) let _ = state_root_message_sender.send(StateRootMessage::ProofCalculated(
.multiproof(proof_targets.clone()); Box::new(ProofCalculated {
match result { state_update: hashed_state_update,
Ok(proof) => { targets: proof_targets,
let _ = state_root_message_sender.send(StateRootMessage::ProofCalculated( proof,
Box::new(ProofCalculated { sequence_number: proof_sequence_number,
state_update: hashed_state_update, }),
targets: proof_targets, ));
proof, }
sequence_number: proof_sequence_number, Err(error) => {
}), let _ =
)); state_root_message_sender.send(StateRootMessage::ProofCalculationError(error));
}
Err(error) => {
let _ = state_root_message_sender
.send(StateRootMessage::ProofCalculationError(error.into()));
}
} }
}); });
} }
@ -486,6 +504,21 @@ where
loop { loop {
match self.rx.recv() { match self.rx.recv() {
Ok(message) => match message { Ok(message) => match message {
StateRootMessage::PrefetchProofs(targets) => {
debug!(
target: "engine::root",
len = targets.len(),
"Prefetching proofs"
);
Self::on_prefetch_proof(
scope,
self.config.clone(),
targets,
&mut self.fetched_proof_targets,
self.proof_sequencer.next_sequence(),
self.tx.clone(),
);
}
StateRootMessage::StateUpdate(update) => { StateRootMessage::StateUpdate(update) => {
if updates_received == 0 { if updates_received == 0 {
first_update_time = Some(Instant::now()); first_update_time = Some(Instant::now());
@ -681,6 +714,31 @@ fn get_proof_targets(
targets targets
} }
/// Calculate multiproof for the targets.
#[inline]
fn calculate_multiproof<Factory>(
config: StateRootConfig<Factory>,
proof_targets: MultiProofTargets,
) -> ProviderResult<MultiProof>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider,
{
let provider = config.consistent_view.provider_ro()?;
Ok(Proof::from_tx(provider.tx_ref())
.with_trie_cursor_factory(InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider.tx_ref()),
&config.nodes_sorted,
))
.with_hashed_cursor_factory(HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider.tx_ref()),
&config.state_sorted,
))
.with_prefix_sets_mut(config.prefix_sets.as_ref().clone())
.with_branch_node_hash_masks(true)
.multiproof(proof_targets)?)
}
/// Updates the sparse trie with the given proofs and state, and returns the updated trie and the /// Updates the sparse trie with the given proofs and state, and returns the updated trie and the
/// time it took. /// time it took.
fn update_sparse_trie< fn update_sparse_trie<