fix: use proofs from prefetch and updates for root completion (#14222)

This commit is contained in:
Dan Cline
2025-02-05 13:30:13 -05:00
committed by GitHub
parent 7789d93001
commit af00d882d2

View File

@ -172,20 +172,10 @@ pub struct ProofCalculated {
sequence_number: u64, sequence_number: u64,
/// Sparse trie update /// Sparse trie update
update: SparseTrieUpdate, update: SparseTrieUpdate,
/// The source of the proof fetch, whether it was requested as a prefetch or as a result of a
/// state update.
source: ProofFetchSource,
/// The time taken to calculate the proof. /// The time taken to calculate the proof.
elapsed: Duration, elapsed: Duration,
} }
impl ProofCalculated {
/// Returns true if the proof was calculated as a result of a state update.
pub(crate) const fn is_from_state_update(&self) -> bool {
matches!(self.source, ProofFetchSource::StateUpdate)
}
}
/// Whether or not a proof was fetched due to a state update, or due to a prefetch command. /// Whether or not a proof was fetched due to a state update, or due to a prefetch command.
#[derive(Debug)] #[derive(Debug)]
pub enum ProofFetchSource { pub enum ProofFetchSource {
@ -318,7 +308,6 @@ struct MultiproofInput<Factory> {
proof_targets: MultiProofTargets, proof_targets: MultiProofTargets,
proof_sequence_number: u64, proof_sequence_number: u64,
state_root_message_sender: Sender<StateRootMessage>, state_root_message_sender: Sender<StateRootMessage>,
source: ProofFetchSource,
} }
/// Manages concurrent multiproof calculations. /// Manages concurrent multiproof calculations.
@ -388,7 +377,6 @@ where
proof_targets, proof_targets,
proof_sequence_number, proof_sequence_number,
state_root_message_sender, state_root_message_sender,
source,
} = input; } = input;
let thread_pool = self.thread_pool.clone(); let thread_pool = self.thread_pool.clone();
@ -419,7 +407,6 @@ where
targets: proof_targets, targets: proof_targets,
multiproof: proof, multiproof: proof,
}, },
source,
elapsed, elapsed,
}), }),
)); ));
@ -482,6 +469,7 @@ pub struct StateRootTask<Factory> {
thread_pool: Arc<rayon::ThreadPool>, thread_pool: Arc<rayon::ThreadPool>,
/// Manages calculation of multiproofs. /// Manages calculation of multiproofs.
multiproof_manager: MultiproofManager<Factory>, multiproof_manager: MultiproofManager<Factory>,
/// State root task metrics
metrics: StateRootTaskMetrics, metrics: StateRootTaskMetrics,
} }
@ -583,7 +571,6 @@ where
proof_targets: targets, proof_targets: targets,
proof_sequence_number: self.proof_sequencer.next_sequence(), proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(), state_root_message_sender: self.tx.clone(),
source: ProofFetchSource::Prefetch,
}); });
} }
@ -601,7 +588,6 @@ where
proof_targets, proof_targets,
proof_sequence_number, proof_sequence_number,
state_root_message_sender: self.tx.clone(), state_root_message_sender: self.tx.clone(),
source: ProofFetchSource::StateUpdate,
}); });
} }
@ -627,6 +613,7 @@ where
fn run(mut self, sparse_trie_tx: Sender<SparseTrieUpdate>) -> StateRootResult { fn run(mut self, sparse_trie_tx: Sender<SparseTrieUpdate>) -> StateRootResult {
let mut sparse_trie_tx = Some(sparse_trie_tx); let mut sparse_trie_tx = Some(sparse_trie_tx);
let mut prefetch_proofs_received = 0;
let mut updates_received = 0; let mut updates_received = 0;
let mut proofs_processed = 0; let mut proofs_processed = 0;
@ -643,9 +630,11 @@ where
Ok(message) => match message { Ok(message) => match message {
StateRootMessage::PrefetchProofs(targets) => { StateRootMessage::PrefetchProofs(targets) => {
trace!(target: "engine::root", "processing StateRootMessage::PrefetchProofs"); trace!(target: "engine::root", "processing StateRootMessage::PrefetchProofs");
prefetch_proofs_received += 1;
debug!( debug!(
target: "engine::root", target: "engine::root",
len = targets.len(), len = targets.len(),
total_prefetches = prefetch_proofs_received,
"Prefetching proofs" "Prefetching proofs"
); );
self.on_prefetch_proof(targets); self.on_prefetch_proof(targets);
@ -672,7 +661,8 @@ where
trace!(target: "engine::root", "processing StateRootMessage::FinishedStateUpdates"); trace!(target: "engine::root", "processing StateRootMessage::FinishedStateUpdates");
updates_finished = true; updates_finished = true;
let all_proofs_received = proofs_processed >= updates_received; let all_proofs_received =
proofs_processed >= updates_received + prefetch_proofs_received;
let no_pending = !self.proof_sequencer.has_pending(); let no_pending = !self.proof_sequencer.has_pending();
if all_proofs_received && no_pending { if all_proofs_received && no_pending {
// drop the sender // drop the sender
@ -687,9 +677,10 @@ where
} }
StateRootMessage::ProofCalculated(proof_calculated) => { StateRootMessage::ProofCalculated(proof_calculated) => {
trace!(target: "engine::root", "processing StateRootMessage::ProofCalculated"); trace!(target: "engine::root", "processing StateRootMessage::ProofCalculated");
if proof_calculated.is_from_state_update() {
proofs_processed += 1; // we increment proofs_processed for both state updates and prefetches,
} // because both are used for the root termination condition.
proofs_processed += 1;
self.metrics self.metrics
.proof_calculation_duration_histogram .proof_calculation_duration_histogram
@ -724,7 +715,8 @@ where
.send(combined_update); .send(combined_update);
} }
let all_proofs_received = proofs_processed >= updates_received; let all_proofs_received =
proofs_processed >= updates_received + prefetch_proofs_received;
let no_pending = !self.proof_sequencer.has_pending(); let no_pending = !self.proof_sequencer.has_pending();
if all_proofs_received && no_pending && updates_finished { if all_proofs_received && no_pending && updates_finished {
// drop the sender // drop the sender