From c4b147c031c6777fad66e3422f9fc55bbf2ae17b Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Tue, 21 Jan 2025 06:53:17 -0500 Subject: [PATCH] chore: move state root task result handling to fn (#13892) Co-authored-by: Roman Krasiuk Co-authored-by: Federico Gimenez --- crates/engine/tree/src/tree/mod.rs | 126 +++++++++++++++++------------ 1 file changed, 75 insertions(+), 51 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 059fde3d7..a55792971 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -50,14 +50,14 @@ use reth_trie::{ hashed_cursor::HashedPostStateCursorFactory, prefix_set::TriePrefixSetsMut, proof::ProofBlindedProviderFactory, - trie_cursor::InMemoryTrieCursorFactory, + trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, updates::{TrieUpdates, TrieUpdatesSorted}, HashedPostState, HashedPostStateSorted, TrieInput, }; use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError}; use revm_primitives::EvmState; -use root::{StateRootComputeOutcome, StateRootConfig, StateRootTask}; +use root::{StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootTask}; use std::{ cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, VecDeque}, @@ -67,7 +67,7 @@ use std::{ mpsc::{Receiver, RecvError, RecvTimeoutError, Sender}, Arc, }, - time::Instant, + time::{Duration, Instant}, }; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, @@ -2364,55 +2364,20 @@ where // different view of the database. let (state_root, trie_updates, root_elapsed) = if persistence_not_in_progress { if self.config.use_state_root_task() { - match state_root_handle - .expect("state root handle must exist if use_state_root_task is true") - .wait_for_result() - { - Ok(StateRootComputeOutcome { - state_root: (task_state_root, task_trie_updates), - time_from_last_update, - .. - }) => { - info!( - target: "engine::tree", - block = ?sealed_block.num_hash(), - ?task_state_root, - task_elapsed = ?time_from_last_update, - "Task state root finished" - ); + let state_root_handle = state_root_handle + .expect("state root handle must exist if use_state_root_task is true"); + let in_memory_trie_cursor = in_memory_trie_cursor + .expect("in memory trie cursor must exist if use_state_root_task is true"); - if task_state_root != block.header().state_root() || - self.config.always_compare_trie_updates() - { - if task_state_root != block.header().state_root() { - debug!(target: "engine::tree", "Task state root does not match block state root"); - } - - let (regular_root, regular_updates) = - state_provider.state_root_with_updates(hashed_state.clone())?; - - if regular_root == block.header().state_root() { - compare_trie_updates( - in_memory_trie_cursor.expect("in memory trie cursor must exist if use_state_root_task is true"), - task_trie_updates.clone(), - regular_updates, - ) - .map_err(ProviderError::from)?; - } else { - debug!(target: "engine::tree", "Regular state root does not match block state root"); - } - } - - (task_state_root, task_trie_updates, time_from_last_update) - } - Err(error) => { - info!(target: "engine::tree", ?error, "Failed to wait for state root task result"); - // Fall back to sequential calculation - let (root, updates) = - state_provider.state_root_with_updates(hashed_state.clone())?; - (root, updates, root_time.elapsed()) - } - } + // Handle state root result from task using handle + self.handle_state_root_result( + state_root_handle, + sealed_block.as_ref(), + &hashed_state, + &state_provider, + in_memory_trie_cursor, + root_time, + )? } else { match self .compute_state_root_parallel(block.header().parent_hash(), &hashed_state) @@ -2589,6 +2554,65 @@ where )) } + /// Waits for the result on the input [`StateRootHandle`], and handles it, falling back to + /// the hash builder-based state root calculation if it fails. + fn handle_state_root_result( + &self, + state_root_handle: StateRootHandle, + sealed_block: &SealedBlock, + hashed_state: &HashedPostState, + state_provider: impl StateRootProvider, + in_memory_trie_cursor: impl TrieCursorFactory, + root_time: Instant, + ) -> Result<(B256, TrieUpdates, Duration), InsertBlockErrorKind> { + match state_root_handle.wait_for_result() { + Ok(StateRootComputeOutcome { + state_root: (task_state_root, task_trie_updates), + time_from_last_update, + .. + }) => { + info!( + target: "engine::tree", + block = ?sealed_block.num_hash(), + ?task_state_root, + task_elapsed = ?time_from_last_update, + "Task state root finished" + ); + + if task_state_root != sealed_block.header().state_root() || + self.config.always_compare_trie_updates() + { + if task_state_root != sealed_block.header().state_root() { + debug!(target: "engine::tree", "Task state root does not match block state root"); + } + + let (regular_root, regular_updates) = + state_provider.state_root_with_updates(hashed_state.clone())?; + + if regular_root == sealed_block.header().state_root() { + compare_trie_updates( + in_memory_trie_cursor, + task_trie_updates.clone(), + regular_updates, + ) + .map_err(ProviderError::from)?; + } else { + debug!(target: "engine::tree", "Regular state root does not match block state root"); + } + } + + Ok((task_state_root, task_trie_updates, time_from_last_update)) + } + Err(error) => { + info!(target: "engine::tree", ?error, "Failed to wait for state root task result"); + // Fall back to sequential calculation + let (root, updates) = + state_provider.state_root_with_updates(hashed_state.clone())?; + Ok((root, updates, root_time.elapsed())) + } + } + } + /// Attempts to find the header for the given block hash if it is canonical. pub fn find_canonical_header( &self,