mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
perf: warm transactions in parallel (#13759)
This commit is contained in:
@ -72,6 +72,7 @@ reth-chain-state = { workspace = true, features = ["test-utils"] }
|
||||
reth-chainspec.workspace = true
|
||||
reth-ethereum-engine-primitives.workspace = true
|
||||
reth-ethereum-consensus.workspace = true
|
||||
reth-evm-ethereum.workspace = true
|
||||
reth-evm = { workspace = true, features = ["test-utils"] }
|
||||
reth-exex-types.workspace = true
|
||||
reth-network-p2p = { workspace = true, features = ["test-utils"] }
|
||||
|
||||
@ -15,7 +15,7 @@ use reth_trie::{
|
||||
};
|
||||
use revm_primitives::map::DefaultHashBuilder;
|
||||
|
||||
type Cache<K, V> = moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
|
||||
pub(crate) type Cache<K, V> = moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
|
||||
|
||||
/// A wrapper of a state provider and a shared cache.
|
||||
pub(crate) struct CachedStateProvider<S> {
|
||||
|
||||
@ -11,13 +11,15 @@ use crate::{
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_primitives::{
|
||||
map::{HashMap, HashSet},
|
||||
BlockNumber, B256, U256,
|
||||
keccak256,
|
||||
map::{B256Set, HashMap, HashSet},
|
||||
Address, BlockNumber, B256, U256,
|
||||
};
|
||||
use alloy_rpc_types_engine::{
|
||||
ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
|
||||
};
|
||||
use block_buffer::BlockBuffer;
|
||||
use cached_state::ProviderCaches;
|
||||
use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
|
||||
use persistence_state::CurrentPersistenceAction;
|
||||
use reth_chain_state::{
|
||||
@ -35,26 +37,32 @@ use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_evm::{
|
||||
execute::BlockExecutorProvider,
|
||||
system_calls::{NoopHook, OnStateHook},
|
||||
ConfigureEvm, Evm, TransactionEnv,
|
||||
};
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_payload_builder_primitives::PayloadBuilder;
|
||||
use reth_payload_primitives::{EngineApiMessageVersion, PayloadBuilderAttributes};
|
||||
use reth_primitives_traits::{
|
||||
Block, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
|
||||
Block, BlockBody, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
|
||||
SignedTransaction,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory,
|
||||
ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
|
||||
StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
|
||||
};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_revm::{cancelled::Cancelled, database::StateProviderDatabase};
|
||||
use reth_stages_api::ControlFlow;
|
||||
use reth_trie::{
|
||||
trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, HashedPostState, TrieInput,
|
||||
trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, HashedPostState,
|
||||
MultiProofTargets, TrieInput,
|
||||
};
|
||||
use reth_trie_db::DatabaseTrieCursorFactory;
|
||||
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
|
||||
use root::{StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootTask};
|
||||
use revm_primitives::ResultAndState;
|
||||
use root::{
|
||||
StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootMessage, StateRootTask,
|
||||
};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{btree_map, hash_map, BTreeMap, VecDeque},
|
||||
@ -518,13 +526,14 @@ pub enum TreeAction {
|
||||
///
|
||||
/// This type is responsible for processing engine API requests, maintaining the canonical state and
|
||||
/// emitting events.
|
||||
pub struct EngineApiTreeHandler<N, P, E, T, V>
|
||||
pub struct EngineApiTreeHandler<N, P, E, T, V, C>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
T: EngineTypes,
|
||||
{
|
||||
provider: P,
|
||||
executor_provider: E,
|
||||
evm_config: C,
|
||||
consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
|
||||
payload_validator: V,
|
||||
/// Keeps track of internals such as executed and buffered blocks.
|
||||
@ -562,18 +571,19 @@ where
|
||||
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
|
||||
/// The engine API variant of this handler
|
||||
engine_kind: EngineApiKind,
|
||||
/// state root task thread pool
|
||||
state_root_task_pool: Arc<rayon::ThreadPool>,
|
||||
/// Thread pool used for the state root task and prewarming
|
||||
thread_pool: Arc<rayon::ThreadPool>,
|
||||
}
|
||||
|
||||
impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug> std::fmt::Debug
|
||||
for EngineApiTreeHandler<N, P, E, T, V>
|
||||
impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug, C: Debug> std::fmt::Debug
|
||||
for EngineApiTreeHandler<N, P, E, T, V, C>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("EngineApiTreeHandler")
|
||||
.field("provider", &self.provider)
|
||||
.field("evm_config", &self.evm_config)
|
||||
.field("executor_provider", &self.executor_provider)
|
||||
.field("consensus", &self.consensus)
|
||||
.field("payload_validator", &self.payload_validator)
|
||||
@ -592,7 +602,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<N, P, E, T, V> EngineApiTreeHandler<N, P, E, T, V>
|
||||
impl<N, P, E, T, V, C> EngineApiTreeHandler<N, P, E, T, V, C>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
P: DatabaseProviderFactory
|
||||
@ -606,6 +616,7 @@ where
|
||||
<P as DatabaseProviderFactory>::Provider:
|
||||
BlockReader<Block = N::Block, Header = N::BlockHeader>,
|
||||
E: BlockExecutorProvider<Primitives = N>,
|
||||
C: ConfigureEvm<Header = N::BlockHeader, Transaction = N::SignedTx>,
|
||||
T: EngineTypes,
|
||||
V: EngineValidator<T, Block = N::Block>,
|
||||
{
|
||||
@ -624,12 +635,13 @@ where
|
||||
payload_builder: PayloadBuilderHandle<T>,
|
||||
config: TreeConfig,
|
||||
engine_kind: EngineApiKind,
|
||||
evm_config: C,
|
||||
) -> Self {
|
||||
let (incoming_tx, incoming) = std::sync::mpsc::channel();
|
||||
|
||||
let num_threads = root::thread_pool_size();
|
||||
|
||||
let state_root_task_pool = Arc::new(
|
||||
let thread_pool = Arc::new(
|
||||
rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(num_threads)
|
||||
.thread_name(|i| format!("srt-worker-{}", i))
|
||||
@ -640,6 +652,7 @@ where
|
||||
Self {
|
||||
provider,
|
||||
executor_provider,
|
||||
evm_config,
|
||||
consensus,
|
||||
payload_validator,
|
||||
incoming,
|
||||
@ -655,7 +668,7 @@ where
|
||||
incoming_tx,
|
||||
invalid_block_hook: Box::new(NoopInvalidBlockHook),
|
||||
engine_kind,
|
||||
state_root_task_pool,
|
||||
thread_pool,
|
||||
}
|
||||
}
|
||||
|
||||
@ -681,6 +694,7 @@ where
|
||||
config: TreeConfig,
|
||||
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
|
||||
kind: EngineApiKind,
|
||||
evm_config: C,
|
||||
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
|
||||
{
|
||||
let best_block_number = provider.best_block_number().unwrap_or(0);
|
||||
@ -712,6 +726,7 @@ where
|
||||
payload_builder,
|
||||
config,
|
||||
kind,
|
||||
evm_config,
|
||||
);
|
||||
task.set_invalid_block_hook(invalid_block_hook);
|
||||
let incoming = task.incoming_tx.clone();
|
||||
@ -2347,17 +2362,6 @@ where
|
||||
return Err(e.into())
|
||||
}
|
||||
|
||||
// Use cached state provider before executing, this does nothing currently, will be used in
|
||||
// prewarming
|
||||
let caches = ProviderCacheBuilder::default().build_caches();
|
||||
let cache_metrics = CachedStateMetrics::zeroed();
|
||||
let state_provider =
|
||||
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
|
||||
|
||||
trace!(target: "engine::tree", block=?block_num_hash, "Executing block");
|
||||
|
||||
let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
|
||||
|
||||
let sealed_block = Arc::new(block.clone_sealed_block());
|
||||
|
||||
// We only run the parallel state root if we are currently persisting blocks that are all
|
||||
@ -2370,7 +2374,10 @@ where
|
||||
let is_descendant_of_persisting_blocks =
|
||||
self.is_descendant_of_persisting_blocks(block.header());
|
||||
|
||||
let (state_root_handle, state_root_task_config, state_hook) =
|
||||
// Atomic bool for letting the prewarm tasks know when to stop
|
||||
let cancel_execution = Cancelled::default();
|
||||
|
||||
let (state_root_handle, state_root_task_config, state_root_sender, state_hook) =
|
||||
if is_descendant_of_persisting_blocks && self.config.use_state_root_task() {
|
||||
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
|
||||
|
||||
@ -2397,21 +2404,69 @@ where
|
||||
.state_root_config_duration
|
||||
.set(config_elapsed.as_secs_f64());
|
||||
|
||||
let state_root_task = StateRootTask::new(
|
||||
state_root_config.clone(),
|
||||
self.state_root_task_pool.clone(),
|
||||
);
|
||||
let state_root_task =
|
||||
StateRootTask::new(state_root_config.clone(), self.thread_pool.clone());
|
||||
let state_root_sender = state_root_task.state_root_message_sender();
|
||||
let state_hook = Box::new(state_root_task.state_hook()) as Box<dyn OnStateHook>;
|
||||
(Some(state_root_task.spawn()), Some(state_root_config), state_hook)
|
||||
(
|
||||
Some(state_root_task.spawn()),
|
||||
Some(state_root_config),
|
||||
Some(state_root_sender),
|
||||
state_hook,
|
||||
)
|
||||
} else {
|
||||
(None, None, Box::new(NoopHook::default()) as Box<dyn OnStateHook>)
|
||||
(None, None, None, Box::new(NoopHook::default()) as Box<dyn OnStateHook>)
|
||||
};
|
||||
|
||||
// Use cached state provider before executing, used in execution after prewarming threads
|
||||
// complete
|
||||
let caches = ProviderCacheBuilder::default().build_caches();
|
||||
let cache_metrics = CachedStateMetrics::zeroed();
|
||||
let state_provider = CachedStateProvider::new_with_caches(
|
||||
state_provider,
|
||||
caches.clone(),
|
||||
cache_metrics.clone(),
|
||||
);
|
||||
|
||||
if self.config.use_caching_and_prewarming() {
|
||||
debug!(target: "engine::tree", "Spawning prewarm threads");
|
||||
let prewarm_start = Instant::now();
|
||||
|
||||
// Prewarm transactions
|
||||
for (tx_idx, (tx, sender)) in
|
||||
block.body().transactions().iter().zip(block.senders()).enumerate()
|
||||
{
|
||||
let state_root_sender = state_root_sender.clone();
|
||||
|
||||
let start = Instant::now();
|
||||
self.prewarm_transaction(
|
||||
block.header().clone(),
|
||||
tx.clone(),
|
||||
*sender,
|
||||
caches.clone(),
|
||||
cache_metrics.clone(),
|
||||
state_root_sender,
|
||||
cancel_execution.clone(),
|
||||
)?;
|
||||
let elapsed = start.elapsed();
|
||||
debug!(target: "engine::tree", ?tx_idx, elapsed = ?elapsed, "Spawned transaction prewarm");
|
||||
}
|
||||
|
||||
drop(state_root_sender);
|
||||
let elapsed = prewarm_start.elapsed();
|
||||
debug!(target: "engine::tree", ?elapsed, "Done spawning prewarm threads");
|
||||
}
|
||||
trace!(target: "engine::tree", block=?block_num_hash, "Executing block");
|
||||
|
||||
let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
|
||||
let execution_start = Instant::now();
|
||||
let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?;
|
||||
let execution_time = execution_start.elapsed();
|
||||
trace!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block");
|
||||
|
||||
// Ensure that prewarm tasks don't send proof messages after state root sender is dropped
|
||||
drop(cancel_execution);
|
||||
|
||||
if let Err(err) = self.consensus.validate_block_post_execution(
|
||||
&block,
|
||||
PostExecutionInput::new(&output.receipts, &output.requests),
|
||||
@ -2573,6 +2628,89 @@ where
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
/// Runs execution for a single transaction, spawning it in the prewarm threadpool.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn prewarm_transaction(
|
||||
&self,
|
||||
block: N::BlockHeader,
|
||||
tx: N::SignedTx,
|
||||
sender: Address,
|
||||
caches: ProviderCaches,
|
||||
cache_metrics: CachedStateMetrics,
|
||||
state_root_sender: Option<Sender<StateRootMessage>>,
|
||||
cancel_execution: Cancelled,
|
||||
) -> Result<(), InsertBlockErrorKind> {
|
||||
let Some(state_provider) = self.state_provider(block.parent_hash())? else {
|
||||
trace!(target: "engine::tree", parent=%block.parent_hash(), "Could not get state provider for prewarm");
|
||||
return Ok(())
|
||||
};
|
||||
|
||||
// Use the caches to create a new executor
|
||||
let state_provider =
|
||||
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
|
||||
|
||||
// clone and copy info required for execution
|
||||
let evm_config = self.evm_config.clone();
|
||||
|
||||
// spawn task executing the individual tx
|
||||
self.thread_pool.spawn(move || {
|
||||
let state_provider = StateProviderDatabase::new(&state_provider);
|
||||
|
||||
// create a new executor and disable nonce checks in the env
|
||||
let mut evm = evm_config.evm_for_block(state_provider, &block);
|
||||
|
||||
// create the tx env and reset nonce
|
||||
let mut tx_env = evm_config.tx_env(&tx, sender);
|
||||
tx_env.unset_nonce();
|
||||
|
||||
// exit early if execution is done
|
||||
if cancel_execution.is_cancelled() {
|
||||
return
|
||||
}
|
||||
|
||||
let ResultAndState { state, .. } = match evm.transact(tx_env) {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
trace!(target: "engine::tree", %err, tx_hash=%tx.tx_hash(), %sender, "Error when executing prewarm transaction");
|
||||
return
|
||||
}
|
||||
};
|
||||
|
||||
// if execution is finished there is no point to sending proof targets
|
||||
if cancel_execution.is_cancelled() {
|
||||
return
|
||||
}
|
||||
|
||||
let Some(state_root_sender) = state_root_sender else {
|
||||
return
|
||||
};
|
||||
|
||||
let mut targets = MultiProofTargets::default();
|
||||
for (addr, account) in state {
|
||||
// if account was not touched, do not fetch for it
|
||||
if !account.is_touched() {
|
||||
continue
|
||||
}
|
||||
|
||||
let mut storage_set = B256Set::default();
|
||||
for (key, slot) in account.storage {
|
||||
// do nothing if unchanged
|
||||
if !slot.is_changed() {
|
||||
continue
|
||||
}
|
||||
|
||||
storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
|
||||
}
|
||||
|
||||
targets.insert(keccak256(addr), storage_set);
|
||||
}
|
||||
|
||||
let _ = state_root_sender.send(StateRootMessage::PrefetchProofs(targets));
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles an error that occurred while inserting a block.
|
||||
///
|
||||
/// If this is a validation error this will mark the block as invalid.
|
||||
@ -2925,6 +3063,7 @@ mod tests {
|
||||
use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator};
|
||||
use reth_ethereum_primitives::{Block, EthPrimitives};
|
||||
use reth_evm::test_utils::MockExecutorProvider;
|
||||
use reth_evm_ethereum::EthEvmConfig;
|
||||
use reth_primitives_traits::Block as _;
|
||||
use reth_provider::test_utils::MockEthProvider;
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState};
|
||||
@ -2995,6 +3134,7 @@ mod tests {
|
||||
MockExecutorProvider,
|
||||
EthEngineTypes,
|
||||
EthereumEngineValidator,
|
||||
EthEvmConfig,
|
||||
>,
|
||||
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
|
||||
from_tree_rx: UnboundedReceiver<EngineApiEvent>,
|
||||
@ -3041,6 +3181,8 @@ mod tests {
|
||||
let (to_payload_service, _payload_command_rx) = unbounded_channel();
|
||||
let payload_builder = PayloadBuilderHandle::new(to_payload_service);
|
||||
|
||||
let evm_config = EthEvmConfig::new(chain_spec.clone());
|
||||
|
||||
let tree = EngineApiTreeHandler::new(
|
||||
provider.clone(),
|
||||
executor_provider.clone(),
|
||||
@ -3054,6 +3196,7 @@ mod tests {
|
||||
payload_builder,
|
||||
TreeConfig::default(),
|
||||
EngineApiKind::Ethereum,
|
||||
evm_config,
|
||||
);
|
||||
|
||||
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
|
||||
|
||||
@ -509,6 +509,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a [`Sender`] that can be used to send arbitrary [`StateRootMessage`]s to this task.
|
||||
pub fn state_root_message_sender(&self) -> Sender<StateRootMessage> {
|
||||
self.tx.clone()
|
||||
}
|
||||
|
||||
/// Returns a [`StateHookSender`] that can be used to send state updates to this task.
|
||||
pub fn state_hook_sender(&self) -> StateHookSender {
|
||||
StateHookSender::new(self.tx.clone())
|
||||
|
||||
Reference in New Issue
Block a user