From 107dfaeaa987928a1ca339f599e84e0d68644cc0 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Tue, 7 Jan 2025 11:27:16 +0100 Subject: [PATCH] feat(engine): wire StateRootTask in EngineApiTreeHandler (#12639) Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com> Co-authored-by: Roman Krasiuk --- bin/reth/src/main.rs | 8 +- book/cli/reth/node.md | 3 + crates/engine/tree/Cargo.toml | 3 +- crates/engine/tree/src/tree/config.rs | 16 ++ crates/engine/tree/src/tree/mod.rs | 270 ++++++++++++++++---------- 5 files changed, 200 insertions(+), 100 deletions(-) diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index e146912c0..f1f0a7d68 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -39,6 +39,10 @@ pub struct EngineArgs { /// Configure the target number of blocks to keep in memory. #[arg(long = "engine.memory-block-buffer-target", conflicts_with = "legacy", default_value_t = DEFAULT_MEMORY_BLOCK_BUFFER_TARGET)] pub memory_block_buffer_target: u64, + + /// Enable state root task + #[arg(long = "engine.state-root-task", conflicts_with = "legacy")] + pub state_root_task_enabled: bool, } impl Default for EngineArgs { @@ -48,6 +52,7 @@ impl Default for EngineArgs { legacy: false, persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD, memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, + state_root_task_enabled: false, } } } @@ -71,7 +76,8 @@ fn main() { false => { let engine_tree_config = TreeConfig::default() .with_persistence_threshold(engine_args.persistence_threshold) - .with_memory_block_buffer_target(engine_args.memory_block_buffer_target); + .with_memory_block_buffer_target(engine_args.memory_block_buffer_target) + .with_state_root_task(engine_args.state_root_task_enabled); let handle = builder .with_types_and_provider::>() .with_components(EthereumNode::components()) diff --git a/book/cli/reth/node.md b/book/cli/reth/node.md index d8465d253..c07585c44 100644 --- a/book/cli/reth/node.md +++ b/book/cli/reth/node.md @@ -700,6 +700,9 @@ Engine: [default: 2] + --engine.state-root-task + Enable state root task + Logging: --log.stdout.format The format to use for logs written to stdout diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 5f51e5d31..5ba7c2306 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -32,6 +32,7 @@ reth-prune.workspace = true reth-revm.workspace = true reth-stages-api.workspace = true reth-tasks.workspace = true +reth-trie-db.workspace = true reth-trie-parallel.workspace = true reth-trie-sparse.workspace = true reth-trie.workspace = true @@ -82,7 +83,6 @@ reth-static-file.workspace = true reth-testing-utils.workspace = true reth-tracing.workspace = true reth-trie-db.workspace = true -proptest.workspace = true # alloy alloy-rlp.workspace = true @@ -90,6 +90,7 @@ alloy-rlp.workspace = true assert_matches.workspace = true criterion.workspace = true crossbeam-channel = "0.5.13" +proptest.workspace = true rand.workspace = true [[bench]] diff --git a/crates/engine/tree/src/tree/config.rs b/crates/engine/tree/src/tree/config.rs index d252b65a8..34a6e4d00 100644 --- a/crates/engine/tree/src/tree/config.rs +++ b/crates/engine/tree/src/tree/config.rs @@ -32,6 +32,8 @@ pub struct TreeConfig { /// This is used as a cutoff to prevent long-running sequential block execution when we receive /// a batch of downloaded blocks. max_execute_block_batch_size: usize, + /// Whether to use the new state root task calculation method instead of parallel calculation + use_state_root_task: bool, } impl Default for TreeConfig { @@ -42,6 +44,7 @@ impl Default for TreeConfig { block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT, max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH, max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE, + use_state_root_task: false, } } } @@ -54,6 +57,7 @@ impl TreeConfig { block_buffer_limit: u32, max_invalid_header_cache_length: u32, max_execute_block_batch_size: usize, + use_state_root_task: bool, ) -> Self { Self { persistence_threshold, @@ -61,6 +65,7 @@ impl TreeConfig { block_buffer_limit, max_invalid_header_cache_length, max_execute_block_batch_size, + use_state_root_task, } } @@ -89,6 +94,11 @@ impl TreeConfig { self.max_execute_block_batch_size } + /// Returns whether to use the state root task calculation method. + pub const fn use_state_root_task(&self) -> bool { + self.use_state_root_task + } + /// Setter for persistence threshold. pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self { self.persistence_threshold = persistence_threshold; @@ -127,4 +137,10 @@ impl TreeConfig { self.max_execute_block_batch_size = max_execute_block_batch_size; self } + + /// Setter for whether to use the new state root task calculation method. + pub const fn with_state_root_task(mut self, use_state_root_task: bool) -> Self { + self.use_state_root_task = use_state_root_task; + self + } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 645106c57..074071bb9 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -31,7 +31,7 @@ use reth_engine_primitives::{ EngineValidator, ForkchoiceStateTracker, OnForkChoiceUpdated, }; use reth_errors::{ConsensusError, ProviderResult}; -use reth_evm::execute::BlockExecutorProvider; +use reth_evm::{execute::BlockExecutorProvider, system_calls::OnStateHook}; use reth_payload_builder::PayloadBuilderHandle; use reth_payload_builder_primitives::PayloadBuilder; use reth_payload_primitives::PayloadBuilderAttributes; @@ -41,15 +41,24 @@ use reth_primitives::{ }; use reth_primitives_traits::Block; use reth_provider::{ - providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome, - HashedPostStateProvider, ProviderError, StateCommitmentProvider, StateProviderBox, - StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, + providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, + ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider, + StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, }; use reth_revm::database::StateProviderDatabase; use reth_stages_api::ControlFlow; -use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; +use reth_trie::{ + hashed_cursor::HashedPostStateCursorFactory, + prefix_set::TriePrefixSetsMut, + proof::ProofBlindedProviderFactory, + trie_cursor::InMemoryTrieCursorFactory, + updates::{TrieUpdates, TrieUpdatesSorted}, + HashedPostState, HashedPostStateSorted, TrieInput, +}; +use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError}; use revm_primitives::EvmState; +use root::{StateRootComputeOutcome, StateRootConfig, StateRootTask}; use std::{ cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, VecDeque}, @@ -463,6 +472,15 @@ pub enum TreeAction { }, } +/// Context used to keep alive the required values when returning a state hook +/// from a scoped thread. +struct StateHookContext

{ + provider_ro: P, + nodes_sorted: Arc, + state_sorted: Arc, + prefix_sets: Arc, +} + /// The engine API tree handler implementation. /// /// This type is responsible for processing engine API requests, maintaining the canonical state and @@ -511,6 +529,8 @@ where invalid_block_hook: Box>, /// The engine API variant of this handler engine_kind: EngineApiKind, + /// state root task thread pool + state_root_task_pool: rayon::ThreadPool, } impl std::fmt::Debug @@ -574,6 +594,15 @@ where ) -> Self { let (incoming_tx, incoming) = std::sync::mpsc::channel(); + let num_threads = + std::thread::available_parallelism().map_or(1, |num| (num.get() / 2).max(1)); + + let state_root_task_pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|i| format!("srt-worker-{}", i)) + .build() + .expect("Failed to create proof worker thread pool"); + Self { provider, executor_provider, @@ -592,6 +621,7 @@ where incoming_tx, invalid_block_hook: Box::new(NoopInvalidBlockHook), engine_kind, + state_root_task_pool, } } @@ -2222,110 +2252,155 @@ where let sealed_block = Arc::new(block.block.clone()); let block = block.unseal(); - let exec_time = Instant::now(); - let persistence_not_in_progress = !self.persistence_state.in_progress(); - // TODO: uncomment to use StateRootTask + let state_root_result = std::thread::scope(|scope| { + let (state_root_handle, state_hook) = if persistence_not_in_progress && + self.config.use_state_root_task() + { + let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; - // let (state_root_handle, state_hook) = if persistence_not_in_progress { - // let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; - // - // let state_root_config = StateRootConfig::new_from_input( - // consistent_view.clone(), - // self.compute_trie_input(consistent_view, block.header().parent_hash()) - // .map_err(ParallelStateRootError::into)?, - // ); - // - // let provider_ro = consistent_view.provider_ro()?; - // let nodes_sorted = state_root_config.nodes_sorted.clone(); - // let state_sorted = state_root_config.state_sorted.clone(); - // let prefix_sets = state_root_config.prefix_sets.clone(); - // let blinded_provider_factory = ProofBlindedProviderFactory::new( - // InMemoryTrieCursorFactory::new( - // DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), - // &nodes_sorted, - // ), - // HashedPostStateCursorFactory::new( - // DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), - // &state_sorted, - // ), - // prefix_sets, - // ); - // - // let state_root_task = StateRootTask::new(state_root_config, - // blinded_provider_factory); let state_hook = state_root_task.state_hook(); - // (Some(state_root_task.spawn(scope)), Box::new(state_hook) as Box) - // } else { - // (None, Box::new(|_state: &EvmState| {}) as Box) - // }; - let state_hook = Box::new(|_state: &EvmState| {}); + let state_root_config = StateRootConfig::new_from_input( + consistent_view.clone(), + self.compute_trie_input(consistent_view.clone(), block.header().parent_hash()) + .map_err(|e| InsertBlockErrorKindTwo::Other(Box::new(e)))?, + ); - let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?; + let provider_ro = consistent_view.provider_ro()?; + let nodes_sorted = state_root_config.nodes_sorted.clone(); + let state_sorted = state_root_config.state_sorted.clone(); + let prefix_sets = state_root_config.prefix_sets.clone(); - trace!(target: "engine::tree", elapsed=?exec_time.elapsed(), ?block_number, "Executed block"); + // context will hold the values that need to be kept alive + let context = + StateHookContext { provider_ro, nodes_sorted, state_sorted, prefix_sets }; - if let Err(err) = self.consensus.validate_block_post_execution( - &block, - PostExecutionInput::new(&output.receipts, &output.requests), - ) { - // call post-block hook - self.invalid_block_hook.on_invalid_block( - &parent_block, - &block.seal_slow(), - &output, - None, - ); - return Err(err.into()) - } + // it is ok to leak here because we are in a scoped thread, the + // memory will be freed when the thread completes + let context = Box::leak(Box::new(context)); - let hashed_state = self.provider.hashed_post_state(&output.state); + let blinded_provider_factory = ProofBlindedProviderFactory::new( + InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(context.provider_ro.tx_ref()), + &context.nodes_sorted, + ), + HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(context.provider_ro.tx_ref()), + &context.state_sorted, + ), + context.prefix_sets.clone(), + ); - trace!(target: "engine::tree", block=?sealed_block.num_hash(), "Calculating block state root"); - let root_time = Instant::now(); + let state_root_task = StateRootTask::new( + state_root_config, + blinded_provider_factory, + &self.state_root_task_pool, + ); + let state_hook = state_root_task.state_hook(); + (Some(state_root_task.spawn(scope)), Box::new(state_hook) as Box) + } else { + (None, Box::new(|_state: &EvmState| {}) as Box) + }; - // We attempt to compute state root in parallel if we are currently not persisting anything - // to database. This is safe, because the database state cannot change until we - // finish parallel computation. It is important that nothing is being persisted as - // we are computing in parallel, because we initialize a different database transaction - // per thread and it might end up with a different view of the database. - let state_root_result = if persistence_not_in_progress { - // TODO: uncomment to use StateRootTask + let execution_start = Instant::now(); + let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?; + let execution_time = execution_start.elapsed(); + trace!(target: "engine::tree", elapsed = ?execution_time, ?block_number, "Executed block"); - // if let Some(state_root_handle) = state_root_handle { - // match state_root_handle.wait_for_result() { - // Ok((task_state_root, task_trie_updates)) => { - // info!( - // target: "engine::tree", - // block = ?sealed_block.num_hash(), - // ?task_state_root, - // "State root task finished" - // ); - // } - // Err(error) => { - // info!(target: "engine::tree", ?error, "Failed to wait for state root task - // result"); } - // } - // } - - match self.compute_state_root_parallel(block.header().parent_hash(), &hashed_state) { - Ok(result) => Some(result), - Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => { - debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back"); - None - } - Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))), + if let Err(err) = self.consensus.validate_block_post_execution( + &block, + PostExecutionInput::new(&output.receipts, &output.requests), + ) { + // call post-block hook + self.invalid_block_hook.on_invalid_block( + &parent_block, + &block.clone().seal_slow(), + &output, + None, + ); + return Err(err.into()) } - } else { - None - }; - let (state_root, trie_output) = if let Some(result) = state_root_result { - result - } else { - debug!(target: "engine::tree", block=?sealed_block.num_hash(), ?persistence_not_in_progress, "Failed to compute state root in parallel"); - state_provider.state_root_with_updates(hashed_state.clone())? - }; + let hashed_state = self.provider.hashed_post_state(&output.state); + + trace!(target: "engine::tree", block=?sealed_block.num_hash(), "Calculating block state root"); + let root_time = Instant::now(); + + // We attempt to compute state root in parallel if we are currently not persisting + // anything to database. This is safe, because the database state cannot + // change until we finish parallel computation. It is important that nothing + // is being persisted as we are computing in parallel, because we initialize + // a different database transaction per thread and it might end up with a + // different view of the database. + let (state_root, trie_updates, root_elapsed) = if persistence_not_in_progress { + if self.config.use_state_root_task() { + match state_root_handle + .expect("state root handle must exist if use_state_root_task is true") + .wait_for_result() + { + Ok(StateRootComputeOutcome { + state_root: (task_state_root, task_trie_updates), + time_from_last_update, + .. + }) => { + info!( + target: "engine::tree", + block = ?sealed_block.num_hash(), + ?task_state_root, + task_elapsed = ?time_from_last_update, + "Task state root finished" + ); + (task_state_root, task_trie_updates, time_from_last_update) + } + Err(error) => { + info!(target: "engine::tree", ?error, "Failed to wait for state root task result"); + // Fall back to sequential calculation + let (root, updates) = + state_provider.state_root_with_updates(hashed_state.clone())?; + (root, updates, root_time.elapsed()) + } + } + } else { + match self + .compute_state_root_parallel(block.header().parent_hash(), &hashed_state) + { + Ok(result) => { + info!( + target: "engine::tree", + block = ?sealed_block.num_hash(), + regular_state_root = ?result.0, + "Regular root task finished" + ); + (result.0, result.1, root_time.elapsed()) + } + Err(ParallelStateRootError::Provider(ProviderError::ConsistentView( + error, + ))) => { + debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back"); + let (root, updates) = + state_provider.state_root_with_updates(hashed_state.clone())?; + (root, updates, root_time.elapsed()) + } + Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))), + } + } + } else { + debug!(target: "engine::tree", block=?sealed_block.num_hash(), ?persistence_not_in_progress, "Failed to compute state root in parallel"); + let (root, updates) = + state_provider.state_root_with_updates(hashed_state.clone())?; + (root, updates, root_time.elapsed()) + }; + + Result::<_, InsertBlockErrorKindTwo>::Ok(( + state_root, + trie_updates, + hashed_state, + output, + root_elapsed, + )) + })?; + + let (state_root, trie_output, hashed_state, output, root_elapsed) = state_root_result; if state_root != block.header().state_root() { // call post-block hook @@ -2341,7 +2416,6 @@ where .into()) } - let root_elapsed = root_time.elapsed(); self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64()); debug!(target: "engine::tree", ?root_elapsed, block=?sealed_block.num_hash(), "Calculated state root");