feat: Return root result without blocking due to sparse trie Drop (#14333)

Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
Varun Doshi
2025-02-12 14:53:17 +05:30
committed by GitHub
parent e9b99b0610
commit faa6b9c125

View File

@ -571,13 +571,15 @@ where
let (tx, rx) = mpsc::channel();
thread_pool.spawn(move || {
debug!(target: "engine::tree", "Starting sparse trie task");
let result = match run_sparse_trie(config, metrics, rx) {
Ok((state_root, trie_updates, iterations)) => {
StateRootMessage::RootCalculated { state_root, trie_updates, iterations }
// We clone the task sender here so that it can be used in case the sparse trie task
// succeeds, without blocking due to any `Drop` implementation.
//
// It's more important to make sure we capture any errors, than to make sure we send an
// error result without blocking, which is why we wait for `run_sparse_trie` to return
// before sending errors.
if let Err(err) = run_sparse_trie(config, metrics, rx, task_tx.clone()) {
let _ = task_tx.send(StateRootMessage::RootCalculationError(err));
}
Err(error) => StateRootMessage::RootCalculationError(error),
};
let _ = task_tx.send(result);
});
tx
}
@ -958,13 +960,17 @@ fn check_end_condition(
/// Listen to incoming sparse trie updates and update the sparse trie.
///
/// Once the updates receiver channel is dropped, returns final state root, trie updates and the
/// number of update iterations.
/// Once the updates receiver channel is dropped, this sends the final state root, trie updates and
/// the number of update iterations to the `task_tx`.
///
/// This takes `task_tx` as an argument so that the state root result can be sent without blocking
/// on any of the `Drop` implementations run at the end of this method.
fn run_sparse_trie<Factory>(
config: StateRootConfig<Factory>,
metrics: StateRootTaskMetrics,
update_rx: mpsc::Receiver<SparseTrieUpdate>,
) -> Result<(B256, TrieUpdates, u64), ParallelStateRootError>
task_tx: Sender<StateRootMessage>,
) -> Result<(), ParallelStateRootError>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider,
{
@ -1011,11 +1017,17 @@ where
debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
let start = Instant::now();
let (root, trie_updates) = trie.root_with_updates().expect("sparse trie should be revealed");
let (state_root, trie_updates) =
trie.root_with_updates().expect("sparse trie should be revealed");
let elapsed = start.elapsed();
metrics.sparse_trie_final_update_duration_histogram.record(elapsed);
Ok((root, trie_updates, num_iterations))
let _ = task_tx.send(StateRootMessage::RootCalculated {
state_root,
trie_updates,
iterations: num_iterations,
});
Ok(())
}
/// Returns accounts only with those storages that were not already fetched, and