chore: move state root task result handling to fn (#13892)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
Co-authored-by: Federico Gimenez <fgimenez@users.noreply.github.com>
This commit is contained in:
Dan Cline
2025-01-21 06:53:17 -05:00
committed by GitHub
parent 50dae68dce
commit c4b147c031

View File

@ -50,14 +50,14 @@ use reth_trie::{
hashed_cursor::HashedPostStateCursorFactory,
prefix_set::TriePrefixSetsMut,
proof::ProofBlindedProviderFactory,
trie_cursor::InMemoryTrieCursorFactory,
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::{TrieUpdates, TrieUpdatesSorted},
HashedPostState, HashedPostStateSorted, TrieInput,
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::EvmState;
use root::{StateRootComputeOutcome, StateRootConfig, StateRootTask};
use root::{StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootTask};
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, VecDeque},
@ -67,7 +67,7 @@ use std::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
Arc,
},
time::Instant,
time::{Duration, Instant},
};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
@ -2364,55 +2364,20 @@ where
// different view of the database.
let (state_root, trie_updates, root_elapsed) = if persistence_not_in_progress {
if self.config.use_state_root_task() {
match state_root_handle
.expect("state root handle must exist if use_state_root_task is true")
.wait_for_result()
{
Ok(StateRootComputeOutcome {
state_root: (task_state_root, task_trie_updates),
time_from_last_update,
..
}) => {
info!(
target: "engine::tree",
block = ?sealed_block.num_hash(),
?task_state_root,
task_elapsed = ?time_from_last_update,
"Task state root finished"
);
let state_root_handle = state_root_handle
.expect("state root handle must exist if use_state_root_task is true");
let in_memory_trie_cursor = in_memory_trie_cursor
.expect("in memory trie cursor must exist if use_state_root_task is true");
if task_state_root != block.header().state_root() ||
self.config.always_compare_trie_updates()
{
if task_state_root != block.header().state_root() {
debug!(target: "engine::tree", "Task state root does not match block state root");
}
let (regular_root, regular_updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;
if regular_root == block.header().state_root() {
compare_trie_updates(
in_memory_trie_cursor.expect("in memory trie cursor must exist if use_state_root_task is true"),
task_trie_updates.clone(),
regular_updates,
)
.map_err(ProviderError::from)?;
} else {
debug!(target: "engine::tree", "Regular state root does not match block state root");
}
}
(task_state_root, task_trie_updates, time_from_last_update)
}
Err(error) => {
info!(target: "engine::tree", ?error, "Failed to wait for state root task result");
// Fall back to sequential calculation
let (root, updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;
(root, updates, root_time.elapsed())
}
}
// Handle state root result from task using handle
self.handle_state_root_result(
state_root_handle,
sealed_block.as_ref(),
&hashed_state,
&state_provider,
in_memory_trie_cursor,
root_time,
)?
} else {
match self
.compute_state_root_parallel(block.header().parent_hash(), &hashed_state)
@ -2589,6 +2554,65 @@ where
))
}
/// Waits for the result on the input [`StateRootHandle`], and handles it, falling back to
/// the hash builder-based state root calculation if it fails.
fn handle_state_root_result(
&self,
state_root_handle: StateRootHandle,
sealed_block: &SealedBlock<N::Block>,
hashed_state: &HashedPostState,
state_provider: impl StateRootProvider,
in_memory_trie_cursor: impl TrieCursorFactory,
root_time: Instant,
) -> Result<(B256, TrieUpdates, Duration), InsertBlockErrorKind> {
match state_root_handle.wait_for_result() {
Ok(StateRootComputeOutcome {
state_root: (task_state_root, task_trie_updates),
time_from_last_update,
..
}) => {
info!(
target: "engine::tree",
block = ?sealed_block.num_hash(),
?task_state_root,
task_elapsed = ?time_from_last_update,
"Task state root finished"
);
if task_state_root != sealed_block.header().state_root() ||
self.config.always_compare_trie_updates()
{
if task_state_root != sealed_block.header().state_root() {
debug!(target: "engine::tree", "Task state root does not match block state root");
}
let (regular_root, regular_updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;
if regular_root == sealed_block.header().state_root() {
compare_trie_updates(
in_memory_trie_cursor,
task_trie_updates.clone(),
regular_updates,
)
.map_err(ProviderError::from)?;
} else {
debug!(target: "engine::tree", "Regular state root does not match block state root");
}
}
Ok((task_state_root, task_trie_updates, time_from_last_update))
}
Err(error) => {
info!(target: "engine::tree", ?error, "Failed to wait for state root task result");
// Fall back to sequential calculation
let (root, updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;
Ok((root, updates, root_time.elapsed()))
}
}
}
/// Attempts to find the header for the given block hash if it is canonical.
pub fn find_canonical_header(
&self,