feat(engine): wire StateRootTask in EngineApiTreeHandler (#12639)

Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Federico Gimenez
2025-01-07 11:27:16 +01:00
committed by GitHub
parent 3212af2d85
commit 107dfaeaa9
5 changed files with 200 additions and 100 deletions

View File

@ -39,6 +39,10 @@ pub struct EngineArgs {
/// Configure the target number of blocks to keep in memory. /// Configure the target number of blocks to keep in memory.
#[arg(long = "engine.memory-block-buffer-target", conflicts_with = "legacy", default_value_t = DEFAULT_MEMORY_BLOCK_BUFFER_TARGET)] #[arg(long = "engine.memory-block-buffer-target", conflicts_with = "legacy", default_value_t = DEFAULT_MEMORY_BLOCK_BUFFER_TARGET)]
pub memory_block_buffer_target: u64, pub memory_block_buffer_target: u64,
/// Enable state root task
#[arg(long = "engine.state-root-task", conflicts_with = "legacy")]
pub state_root_task_enabled: bool,
} }
impl Default for EngineArgs { impl Default for EngineArgs {
@ -48,6 +52,7 @@ impl Default for EngineArgs {
legacy: false, legacy: false,
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD, persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
state_root_task_enabled: false,
} }
} }
} }
@ -71,7 +76,8 @@ fn main() {
false => { false => {
let engine_tree_config = TreeConfig::default() let engine_tree_config = TreeConfig::default()
.with_persistence_threshold(engine_args.persistence_threshold) .with_persistence_threshold(engine_args.persistence_threshold)
.with_memory_block_buffer_target(engine_args.memory_block_buffer_target); .with_memory_block_buffer_target(engine_args.memory_block_buffer_target)
.with_state_root_task(engine_args.state_root_task_enabled);
let handle = builder let handle = builder
.with_types_and_provider::<EthereumNode, BlockchainProvider2<_>>() .with_types_and_provider::<EthereumNode, BlockchainProvider2<_>>()
.with_components(EthereumNode::components()) .with_components(EthereumNode::components())

View File

@ -700,6 +700,9 @@ Engine:
[default: 2] [default: 2]
--engine.state-root-task
Enable state root task
Logging: Logging:
--log.stdout.format <FORMAT> --log.stdout.format <FORMAT>
The format to use for logs written to stdout The format to use for logs written to stdout

View File

@ -32,6 +32,7 @@ reth-prune.workspace = true
reth-revm.workspace = true reth-revm.workspace = true
reth-stages-api.workspace = true reth-stages-api.workspace = true
reth-tasks.workspace = true reth-tasks.workspace = true
reth-trie-db.workspace = true
reth-trie-parallel.workspace = true reth-trie-parallel.workspace = true
reth-trie-sparse.workspace = true reth-trie-sparse.workspace = true
reth-trie.workspace = true reth-trie.workspace = true
@ -82,7 +83,6 @@ reth-static-file.workspace = true
reth-testing-utils.workspace = true reth-testing-utils.workspace = true
reth-tracing.workspace = true reth-tracing.workspace = true
reth-trie-db.workspace = true reth-trie-db.workspace = true
proptest.workspace = true
# alloy # alloy
alloy-rlp.workspace = true alloy-rlp.workspace = true
@ -90,6 +90,7 @@ alloy-rlp.workspace = true
assert_matches.workspace = true assert_matches.workspace = true
criterion.workspace = true criterion.workspace = true
crossbeam-channel = "0.5.13" crossbeam-channel = "0.5.13"
proptest.workspace = true
rand.workspace = true rand.workspace = true
[[bench]] [[bench]]

View File

@ -32,6 +32,8 @@ pub struct TreeConfig {
/// This is used as a cutoff to prevent long-running sequential block execution when we receive /// This is used as a cutoff to prevent long-running sequential block execution when we receive
/// a batch of downloaded blocks. /// a batch of downloaded blocks.
max_execute_block_batch_size: usize, max_execute_block_batch_size: usize,
/// Whether to use the new state root task calculation method instead of parallel calculation
use_state_root_task: bool,
} }
impl Default for TreeConfig { impl Default for TreeConfig {
@ -42,6 +44,7 @@ impl Default for TreeConfig {
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT, block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH, max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE, max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
use_state_root_task: false,
} }
} }
} }
@ -54,6 +57,7 @@ impl TreeConfig {
block_buffer_limit: u32, block_buffer_limit: u32,
max_invalid_header_cache_length: u32, max_invalid_header_cache_length: u32,
max_execute_block_batch_size: usize, max_execute_block_batch_size: usize,
use_state_root_task: bool,
) -> Self { ) -> Self {
Self { Self {
persistence_threshold, persistence_threshold,
@ -61,6 +65,7 @@ impl TreeConfig {
block_buffer_limit, block_buffer_limit,
max_invalid_header_cache_length, max_invalid_header_cache_length,
max_execute_block_batch_size, max_execute_block_batch_size,
use_state_root_task,
} }
} }
@ -89,6 +94,11 @@ impl TreeConfig {
self.max_execute_block_batch_size self.max_execute_block_batch_size
} }
/// Returns whether to use the state root task calculation method.
pub const fn use_state_root_task(&self) -> bool {
self.use_state_root_task
}
/// Setter for persistence threshold. /// Setter for persistence threshold.
pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self { pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self {
self.persistence_threshold = persistence_threshold; self.persistence_threshold = persistence_threshold;
@ -127,4 +137,10 @@ impl TreeConfig {
self.max_execute_block_batch_size = max_execute_block_batch_size; self.max_execute_block_batch_size = max_execute_block_batch_size;
self self
} }
/// Setter for whether to use the new state root task calculation method.
pub const fn with_state_root_task(mut self, use_state_root_task: bool) -> Self {
self.use_state_root_task = use_state_root_task;
self
}
} }

View File

@ -31,7 +31,7 @@ use reth_engine_primitives::{
EngineValidator, ForkchoiceStateTracker, OnForkChoiceUpdated, EngineValidator, ForkchoiceStateTracker, OnForkChoiceUpdated,
}; };
use reth_errors::{ConsensusError, ProviderResult}; use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::execute::BlockExecutorProvider; use reth_evm::{execute::BlockExecutorProvider, system_calls::OnStateHook};
use reth_payload_builder::PayloadBuilderHandle; use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_builder_primitives::PayloadBuilder; use reth_payload_builder_primitives::PayloadBuilder;
use reth_payload_primitives::PayloadBuilderAttributes; use reth_payload_primitives::PayloadBuilderAttributes;
@ -41,15 +41,24 @@ use reth_primitives::{
}; };
use reth_primitives_traits::Block; use reth_primitives_traits::Block;
use reth_provider::{ use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome, providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory,
HashedPostStateProvider, ProviderError, StateCommitmentProvider, StateProviderBox, ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
}; };
use reth_revm::database::StateProviderDatabase; use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow; use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use reth_trie::{
hashed_cursor::HashedPostStateCursorFactory,
prefix_set::TriePrefixSetsMut,
proof::ProofBlindedProviderFactory,
trie_cursor::InMemoryTrieCursorFactory,
updates::{TrieUpdates, TrieUpdatesSorted},
HashedPostState, HashedPostStateSorted, TrieInput,
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError}; use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::EvmState; use revm_primitives::EvmState;
use root::{StateRootComputeOutcome, StateRootConfig, StateRootTask};
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, VecDeque}, collections::{btree_map, hash_map, BTreeMap, VecDeque},
@ -463,6 +472,15 @@ pub enum TreeAction {
}, },
} }
/// Context used to keep alive the required values when returning a state hook
/// from a scoped thread.
struct StateHookContext<P> {
provider_ro: P,
nodes_sorted: Arc<TrieUpdatesSorted>,
state_sorted: Arc<HashedPostStateSorted>,
prefix_sets: Arc<TriePrefixSetsMut>,
}
/// The engine API tree handler implementation. /// The engine API tree handler implementation.
/// ///
/// This type is responsible for processing engine API requests, maintaining the canonical state and /// This type is responsible for processing engine API requests, maintaining the canonical state and
@ -511,6 +529,8 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook<N>>, invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
/// The engine API variant of this handler /// The engine API variant of this handler
engine_kind: EngineApiKind, engine_kind: EngineApiKind,
/// state root task thread pool
state_root_task_pool: rayon::ThreadPool,
} }
impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug> std::fmt::Debug impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug> std::fmt::Debug
@ -574,6 +594,15 @@ where
) -> Self { ) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel(); let (incoming_tx, incoming) = std::sync::mpsc::channel();
let num_threads =
std::thread::available_parallelism().map_or(1, |num| (num.get() / 2).max(1));
let state_root_task_pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("srt-worker-{}", i))
.build()
.expect("Failed to create proof worker thread pool");
Self { Self {
provider, provider,
executor_provider, executor_provider,
@ -592,6 +621,7 @@ where
incoming_tx, incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook), invalid_block_hook: Box::new(NoopInvalidBlockHook),
engine_kind, engine_kind,
state_root_task_pool,
} }
} }
@ -2222,110 +2252,155 @@ where
let sealed_block = Arc::new(block.block.clone()); let sealed_block = Arc::new(block.block.clone());
let block = block.unseal(); let block = block.unseal();
let exec_time = Instant::now();
let persistence_not_in_progress = !self.persistence_state.in_progress(); let persistence_not_in_progress = !self.persistence_state.in_progress();
// TODO: uncomment to use StateRootTask let state_root_result = std::thread::scope(|scope| {
let (state_root_handle, state_hook) = if persistence_not_in_progress &&
self.config.use_state_root_task()
{
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
// let (state_root_handle, state_hook) = if persistence_not_in_progress { let state_root_config = StateRootConfig::new_from_input(
// let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; consistent_view.clone(),
// self.compute_trie_input(consistent_view.clone(), block.header().parent_hash())
// let state_root_config = StateRootConfig::new_from_input( .map_err(|e| InsertBlockErrorKindTwo::Other(Box::new(e)))?,
// consistent_view.clone(), );
// self.compute_trie_input(consistent_view, block.header().parent_hash())
// .map_err(ParallelStateRootError::into)?,
// );
//
// let provider_ro = consistent_view.provider_ro()?;
// let nodes_sorted = state_root_config.nodes_sorted.clone();
// let state_sorted = state_root_config.state_sorted.clone();
// let prefix_sets = state_root_config.prefix_sets.clone();
// let blinded_provider_factory = ProofBlindedProviderFactory::new(
// InMemoryTrieCursorFactory::new(
// DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
// &nodes_sorted,
// ),
// HashedPostStateCursorFactory::new(
// DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
// &state_sorted,
// ),
// prefix_sets,
// );
//
// let state_root_task = StateRootTask::new(state_root_config,
// blinded_provider_factory); let state_hook = state_root_task.state_hook();
// (Some(state_root_task.spawn(scope)), Box::new(state_hook) as Box<dyn OnStateHook>)
// } else {
// (None, Box::new(|_state: &EvmState| {}) as Box<dyn OnStateHook>)
// };
let state_hook = Box::new(|_state: &EvmState| {});
let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?; let provider_ro = consistent_view.provider_ro()?;
let nodes_sorted = state_root_config.nodes_sorted.clone();
let state_sorted = state_root_config.state_sorted.clone();
let prefix_sets = state_root_config.prefix_sets.clone();
trace!(target: "engine::tree", elapsed=?exec_time.elapsed(), ?block_number, "Executed block"); // context will hold the values that need to be kept alive
let context =
StateHookContext { provider_ro, nodes_sorted, state_sorted, prefix_sets };
if let Err(err) = self.consensus.validate_block_post_execution( // it is ok to leak here because we are in a scoped thread, the
&block, // memory will be freed when the thread completes
PostExecutionInput::new(&output.receipts, &output.requests), let context = Box::leak(Box::new(context));
) {
// call post-block hook
self.invalid_block_hook.on_invalid_block(
&parent_block,
&block.seal_slow(),
&output,
None,
);
return Err(err.into())
}
let hashed_state = self.provider.hashed_post_state(&output.state); let blinded_provider_factory = ProofBlindedProviderFactory::new(
InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(context.provider_ro.tx_ref()),
&context.nodes_sorted,
),
HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(context.provider_ro.tx_ref()),
&context.state_sorted,
),
context.prefix_sets.clone(),
);
trace!(target: "engine::tree", block=?sealed_block.num_hash(), "Calculating block state root"); let state_root_task = StateRootTask::new(
let root_time = Instant::now(); state_root_config,
blinded_provider_factory,
&self.state_root_task_pool,
);
let state_hook = state_root_task.state_hook();
(Some(state_root_task.spawn(scope)), Box::new(state_hook) as Box<dyn OnStateHook>)
} else {
(None, Box::new(|_state: &EvmState| {}) as Box<dyn OnStateHook>)
};
// We attempt to compute state root in parallel if we are currently not persisting anything let execution_start = Instant::now();
// to database. This is safe, because the database state cannot change until we let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?;
// finish parallel computation. It is important that nothing is being persisted as let execution_time = execution_start.elapsed();
// we are computing in parallel, because we initialize a different database transaction trace!(target: "engine::tree", elapsed = ?execution_time, ?block_number, "Executed block");
// per thread and it might end up with a different view of the database.
let state_root_result = if persistence_not_in_progress {
// TODO: uncomment to use StateRootTask
// if let Some(state_root_handle) = state_root_handle { if let Err(err) = self.consensus.validate_block_post_execution(
// match state_root_handle.wait_for_result() { &block,
// Ok((task_state_root, task_trie_updates)) => { PostExecutionInput::new(&output.receipts, &output.requests),
// info!( ) {
// target: "engine::tree", // call post-block hook
// block = ?sealed_block.num_hash(), self.invalid_block_hook.on_invalid_block(
// ?task_state_root, &parent_block,
// "State root task finished" &block.clone().seal_slow(),
// ); &output,
// } None,
// Err(error) => { );
// info!(target: "engine::tree", ?error, "Failed to wait for state root task return Err(err.into())
// result"); }
// }
// }
match self.compute_state_root_parallel(block.header().parent_hash(), &hashed_state) {
Ok(result) => Some(result),
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
None
}
Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))),
} }
} else {
None
};
let (state_root, trie_output) = if let Some(result) = state_root_result { let hashed_state = self.provider.hashed_post_state(&output.state);
result
} else { trace!(target: "engine::tree", block=?sealed_block.num_hash(), "Calculating block state root");
debug!(target: "engine::tree", block=?sealed_block.num_hash(), ?persistence_not_in_progress, "Failed to compute state root in parallel"); let root_time = Instant::now();
state_provider.state_root_with_updates(hashed_state.clone())?
}; // We attempt to compute state root in parallel if we are currently not persisting
// anything to database. This is safe, because the database state cannot
// change until we finish parallel computation. It is important that nothing
// is being persisted as we are computing in parallel, because we initialize
// a different database transaction per thread and it might end up with a
// different view of the database.
let (state_root, trie_updates, root_elapsed) = if persistence_not_in_progress {
if self.config.use_state_root_task() {
match state_root_handle
.expect("state root handle must exist if use_state_root_task is true")
.wait_for_result()
{
Ok(StateRootComputeOutcome {
state_root: (task_state_root, task_trie_updates),
time_from_last_update,
..
}) => {
info!(
target: "engine::tree",
block = ?sealed_block.num_hash(),
?task_state_root,
task_elapsed = ?time_from_last_update,
"Task state root finished"
);
(task_state_root, task_trie_updates, time_from_last_update)
}
Err(error) => {
info!(target: "engine::tree", ?error, "Failed to wait for state root task result");
// Fall back to sequential calculation
let (root, updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;
(root, updates, root_time.elapsed())
}
}
} else {
match self
.compute_state_root_parallel(block.header().parent_hash(), &hashed_state)
{
Ok(result) => {
info!(
target: "engine::tree",
block = ?sealed_block.num_hash(),
regular_state_root = ?result.0,
"Regular root task finished"
);
(result.0, result.1, root_time.elapsed())
}
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(
error,
))) => {
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
let (root, updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;
(root, updates, root_time.elapsed())
}
Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))),
}
}
} else {
debug!(target: "engine::tree", block=?sealed_block.num_hash(), ?persistence_not_in_progress, "Failed to compute state root in parallel");
let (root, updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;
(root, updates, root_time.elapsed())
};
Result::<_, InsertBlockErrorKindTwo>::Ok((
state_root,
trie_updates,
hashed_state,
output,
root_elapsed,
))
})?;
let (state_root, trie_output, hashed_state, output, root_elapsed) = state_root_result;
if state_root != block.header().state_root() { if state_root != block.header().state_root() {
// call post-block hook // call post-block hook
@ -2341,7 +2416,6 @@ where
.into()) .into())
} }
let root_elapsed = root_time.elapsed();
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64()); self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
debug!(target: "engine::tree", ?root_elapsed, block=?sealed_block.num_hash(), "Calculated state root"); debug!(target: "engine::tree", ?root_elapsed, block=?sealed_block.num_hash(), "Calculated state root");