From 172369afd58b128fd0482dd0c385d9ccfc18f4fc Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Wed, 12 Feb 2025 18:48:14 +0400 Subject: [PATCH] feat: add Consensus to `ExecutionStage` (#14447) --- Cargo.lock | 4 +- bin/reth/src/cli/mod.rs | 16 ++-- bin/reth/src/commands/debug_cmd/execution.rs | 15 ++-- bin/reth/src/commands/debug_cmd/merkle.rs | 14 ++-- crates/cli/commands/Cargo.toml | 1 - crates/cli/commands/src/common.rs | 34 ++++++++- crates/cli/commands/src/import.rs | 18 ++--- .../cli/commands/src/stage/dump/execution.rs | 18 +++-- crates/cli/commands/src/stage/dump/merkle.rs | 2 + crates/cli/commands/src/stage/dump/mod.rs | 19 ++--- crates/cli/commands/src/stage/mod.rs | 13 ++-- crates/cli/commands/src/stage/run.rs | 20 +++-- crates/cli/commands/src/stage/unwind.rs | 12 ++- crates/ethereum/consensus/src/lib.rs | 4 +- crates/ethereum/consensus/src/validation.rs | 18 +++-- crates/ethereum/evm/src/execute.rs | 64 ++++++++-------- .../execution-types/src/execution_outcome.rs | 26 ++++++- crates/evm/src/execute.rs | 33 ++++++-- crates/exex/exex/src/backfill/job.rs | 22 ++++-- crates/exex/exex/src/backfill/test_utils.rs | 6 +- crates/node/builder/src/setup.rs | 9 ++- crates/optimism/cli/Cargo.toml | 2 + crates/optimism/cli/src/lib.rs | 6 +- crates/optimism/evm/src/execute.rs | 26 +++---- crates/stages/stages/Cargo.toml | 1 + crates/stages/stages/src/lib.rs | 5 +- crates/stages/stages/src/sets.rs | 75 ++++++++++++------- crates/stages/stages/src/stages/execution.rs | 57 +++++++++++--- crates/stages/stages/src/stages/mod.rs | 4 + testing/ef-tests/Cargo.toml | 1 + testing/ef-tests/src/cases/blockchain_test.rs | 4 +- 31 files changed, 355 insertions(+), 194 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 978225cdf..b1716805e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2757,6 +2757,7 @@ dependencies = [ "reth-chainspec", "reth-db", "reth-db-api", + "reth-ethereum-consensus", "reth-evm-ethereum", "reth-primitives", "reth-provider", @@ -6766,7 +6767,6 @@ dependencies = [ "reth-ecies", "reth-eth-wire", "reth-ethereum-cli", - "reth-ethereum-consensus", "reth-evm", "reth-exex", "reth-fs-util", @@ -8406,6 +8406,7 @@ dependencies = [ "reth-node-events", "reth-node-metrics", "reth-optimism-chainspec", + "reth-optimism-consensus", "reth-optimism-evm", "reth-optimism-node", "reth-optimism-primitives", @@ -9298,6 +9299,7 @@ dependencies = [ "reth-db", "reth-db-api", "reth-downloaders", + "reth-ethereum-consensus", "reth-etl", "reth-evm", "reth-evm-ethereum", diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index 0593c0f17..bceaa5f54 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -18,7 +18,7 @@ use reth_db::DatabaseEnv; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; use reth_network::EthNetworkPrimitives; use reth_node_builder::{NodeBuilder, WithLaunchContext}; -use reth_node_ethereum::{EthExecutorProvider, EthereumNode}; +use reth_node_ethereum::{consensus::EthBeaconConsensus, EthExecutorProvider, EthereumNode}; use reth_node_metrics::recorder::install_prometheus_recorder; use reth_tracing::FileWorkerGuard; use std::{ffi::OsString, fmt, future::Future, sync::Arc}; @@ -152,6 +152,9 @@ impl, Ext: clap::Args + fmt::Debug> Cl let _ = install_prometheus_recorder(); let runner = CliRunner::default(); + let components = |spec: Arc| { + (EthExecutorProvider::ethereum(spec.clone()), EthBeaconConsensus::new(spec)) + }; match self.command { Commands::Node(command) => { runner.run_command_until_exit(|ctx| command.execute(ctx, launcher)) @@ -162,18 +165,15 @@ impl, Ext: clap::Args + fmt::Debug> Cl Commands::InitState(command) => { runner.run_blocking_until_ctrl_c(command.execute::()) } - Commands::Import(command) => runner.run_blocking_until_ctrl_c( - command.execute::(EthExecutorProvider::ethereum), - ), + Commands::Import(command) => { + runner.run_blocking_until_ctrl_c(command.execute::(components)) + } Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Db(command) => { runner.run_blocking_until_ctrl_c(command.execute::()) } Commands::Stage(command) => runner.run_command_until_exit(|ctx| { - command.execute::( - ctx, - EthExecutorProvider::ethereum, - ) + command.execute::(ctx, components) }), Commands::P2P(command) => { runner.run_until_ctrl_c(command.execute::()) diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index bcf79ebf3..710c588fd 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -1,6 +1,6 @@ //! Command for debugging execution. -use crate::{api::BlockTy, args::NetworkArgs, utils::get_single_header}; +use crate::{args::NetworkArgs, utils::get_single_header}; use alloy_eips::BlockHashOrNumber; use alloy_primitives::{BlockNumber, B256}; use clap::Parser; @@ -11,7 +11,7 @@ use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, Environ use reth_cli_runner::CliContext; use reth_cli_util::get_secret_key; use reth_config::Config; -use reth_consensus::Consensus; +use reth_consensus::FullConsensus; use reth_db::DatabaseEnv; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, @@ -64,7 +64,7 @@ impl> Command { &self, config: &Config, client: Client, - consensus: Arc, Error = ConsensusError>>, + consensus: Arc>, provider_factory: ProviderFactory, task_executor: &TaskExecutor, static_file_producer: StaticFileProducer>, @@ -79,7 +79,7 @@ impl> Command { .into_task_with(task_executor); let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies) - .build(client, Arc::clone(&consensus), provider_factory.clone()) + .build(client, consensus.clone().as_consensus(), provider_factory.clone()) .into_task_with(task_executor); let stage_conf = &config.stages; @@ -94,7 +94,7 @@ impl> Command { DefaultStages::new( provider_factory.clone(), tip_rx, - Arc::clone(&consensus), + consensus.clone(), header_downloader, body_downloader, executor.clone(), @@ -103,6 +103,7 @@ impl> Command { ) .set(ExecutionStage::new( executor, + consensus.clone(), ExecutionStageThresholds { max_blocks: None, max_changes: None, @@ -171,7 +172,7 @@ impl> Command { let Environment { provider_factory, config, data_dir } = self.env.init::(AccessRights::RW)?; - let consensus: Arc, Error = ConsensusError>> = + let consensus: Arc> = Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec())); // Configure and build network @@ -195,7 +196,7 @@ impl> Command { let mut pipeline = self.build_pipeline( &config, fetch_client.clone(), - Arc::clone(&consensus), + consensus.clone(), provider_factory.clone(), &ctx.task_executor, static_file_producer, diff --git a/bin/reth/src/commands/debug_cmd/merkle.rs b/bin/reth/src/commands/debug_cmd/merkle.rs index aa94afc06..133b1d78a 100644 --- a/bin/reth/src/commands/debug_cmd/merkle.rs +++ b/bin/reth/src/commands/debug_cmd/merkle.rs @@ -1,5 +1,5 @@ //! Command for debugging merkle tree calculation. -use crate::{args::NetworkArgs, utils::get_single_header}; +use crate::{args::NetworkArgs, providers::ExecutionOutcome, utils::get_single_header}; use alloy_consensus::BlockHeader; use alloy_eips::BlockHashOrNumber; use backon::{ConstantBuilder, Retryable}; @@ -14,7 +14,7 @@ use reth_consensus::{Consensus, ConsensusError}; use reth_db::tables; use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; use reth_ethereum_primitives::EthPrimitives; -use reth_evm::execute::{BatchExecutor, BlockExecutorProvider}; +use reth_evm::execute::{BlockExecutorProvider, Executor}; use reth_network::{BlockDownloaderProvider, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_network_p2p::full_block::FullBlockClient; @@ -161,14 +161,12 @@ impl> Command { provider_rw.insert_block(sealed_block.clone(), StorageLocation::Database)?; td += sealed_block.difficulty(); - let mut executor = executor_provider.batch_executor(StateProviderDatabase::new( - LatestStateProviderRef::new(&provider_rw), - )); - executor.execute_and_verify_one(&sealed_block)?; - let execution_outcome = executor.finalize(); + let executor = executor_provider + .executor(StateProviderDatabase::new(LatestStateProviderRef::new(&provider_rw))); + let output = executor.execute(&sealed_block)?; provider_rw.write_state( - &execution_outcome, + &ExecutionOutcome::single(block_number, output), OriginalValuesKnown::Yes, StorageLocation::Database, )?; diff --git a/crates/cli/commands/Cargo.toml b/crates/cli/commands/Cargo.toml index 7bf89c845..9e6136f48 100644 --- a/crates/cli/commands/Cargo.toml +++ b/crates/cli/commands/Cargo.toml @@ -11,7 +11,6 @@ repository.workspace = true [dependencies] # reth -reth-ethereum-consensus.workspace = true reth-chainspec.workspace = true reth-cli.workspace = true reth-ethereum-cli.workspace = true diff --git a/crates/cli/commands/src/common.rs b/crates/cli/commands/src/common.rs index 192cf9cb4..2ee47c1cd 100644 --- a/crates/cli/commands/src/common.rs +++ b/crates/cli/commands/src/common.rs @@ -5,11 +5,11 @@ use clap::Parser; use reth_chainspec::EthChainSpec; use reth_cli::chainspec::ChainSpecParser; use reth_config::{config::EtlConfig, Config}; -use reth_consensus::noop::NoopConsensus; +use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus}; use reth_db::{init_db, open_db_read_only, DatabaseEnv}; use reth_db_common::init::init_genesis; use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader}; -use reth_evm::noop::NoopBlockExecutorProvider; +use reth_evm::{execute::BlockExecutorProvider, noop::NoopBlockExecutorProvider}; use reth_node_builder::{NodeTypesWithDBAdapter, NodeTypesWithEngine}; use reth_node_core::{ args::{DatabaseArgs, DatadirArgs}, @@ -199,3 +199,33 @@ impl AccessRights { /// [`NodeTypes`](reth_node_builder::NodeTypes) in CLI. pub trait CliNodeTypes: NodeTypesWithEngine + NodeTypesForProvider {} impl CliNodeTypes for N where N: NodeTypesWithEngine + NodeTypesForProvider {} + +/// Helper trait aggregating components required for the CLI. +pub trait CliNodeComponents { + /// Block executor. + type Executor: BlockExecutorProvider; + /// Consensus implementation. + type Consensus: FullConsensus + Clone + 'static; + + /// Returns the block executor. + fn executor(&self) -> &Self::Executor; + /// Returns the consensus implementation. + fn consensus(&self) -> &Self::Consensus; +} + +impl CliNodeComponents for (E, C) +where + E: BlockExecutorProvider, + C: FullConsensus + Clone + 'static, +{ + type Executor = E; + type Consensus = C; + + fn executor(&self) -> &Self::Executor { + &self.0 + } + + fn consensus(&self) -> &Self::Consensus { + &self.1 + } +} diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index 13b3e3f6c..1353bb69d 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -1,12 +1,12 @@ //! Command that initializes the node by importing a chain from a file. -use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs}; +use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs}; use alloy_primitives::B256; use clap::Parser; use futures::{Stream, StreamExt}; use reth_chainspec::{EthChainSpec, EthereumHardforks}; use reth_cli::chainspec::ChainSpecParser; use reth_config::Config; -use reth_consensus::{Consensus, ConsensusError}; +use reth_consensus::{ConsensusError, FullConsensus}; use reth_db::tables; use reth_db_api::transaction::DbTx; use reth_downloaders::{ @@ -14,7 +14,6 @@ use reth_downloaders::{ file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}, headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; -use reth_ethereum_consensus::EthBeaconConsensus; use reth_evm::execute::BlockExecutorProvider; use reth_network_p2p::{ bodies::downloader::BodyDownloader, @@ -58,11 +57,11 @@ pub struct ImportCommand { impl> ImportCommand { /// Execute `import` command - pub async fn execute(self, executor: F) -> eyre::Result<()> + pub async fn execute(self, components: F) -> eyre::Result<()> where N: CliNodeTypes, - E: BlockExecutorProvider, - F: FnOnce(Arc) -> E, + Comp: CliNodeComponents, + F: FnOnce(Arc) -> Comp, { info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); @@ -77,8 +76,9 @@ impl> ImportComm let Environment { provider_factory, config, .. } = self.env.init::(AccessRights::RW)?; - let executor = executor(provider_factory.chain_spec()); - let consensus = Arc::new(EthBeaconConsensus::new(self.env.chain.clone())); + let components = components(provider_factory.chain_spec()); + let executor = components.executor().clone(); + let consensus = Arc::new(components.consensus().clone()); info!(target: "reth::cli", "Consensus engine initialized"); // open file @@ -179,7 +179,7 @@ pub fn build_import_pipeline( ) -> eyre::Result<(Pipeline, impl Stream>)> where N: ProviderNodeTypes + CliNodeTypes, - C: Consensus, Error = ConsensusError> + 'static, + C: FullConsensus + 'static, E: BlockExecutorProvider, { if !file_client.has_canonical_blocks() { diff --git a/crates/cli/commands/src/stage/dump/execution.rs b/crates/cli/commands/src/stage/dump/execution.rs index 6910f76d1..313ca88f5 100644 --- a/crates/cli/commands/src/stage/dump/execution.rs +++ b/crates/cli/commands/src/stage/dump/execution.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use super::setup; +use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus}; use reth_db::{tables, DatabaseEnv}; use reth_db_api::{ cursor::DbCursorRO, database::Database, table::TableImporter, transaction::DbTx, @@ -16,17 +17,19 @@ use reth_provider::{ use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput}; use tracing::info; -pub(crate) async fn dump_execution_stage( +pub(crate) async fn dump_execution_stage( db_tool: &DbTool, from: u64, to: u64, output_datadir: ChainPath, should_run: bool, executor: E, + consensus: C, ) -> eyre::Result<()> where N: ProviderNodeTypes>, E: BlockExecutorProvider, + C: FullConsensus + 'static, { let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?; @@ -44,6 +47,7 @@ where to, from, executor, + consensus, )?; } @@ -139,8 +143,10 @@ fn unwind_and_copy( ) -> eyre::Result<()> { let provider = db_tool.provider_factory.database_provider_rw()?; - let mut exec_stage = - ExecutionStage::new_with_executor(NoopBlockExecutorProvider::::default()); + let mut exec_stage = ExecutionStage::new_with_executor( + NoopBlockExecutorProvider::::default(), + NoopConsensus::arc(), + ); exec_stage.unwind( &provider, @@ -162,19 +168,21 @@ fn unwind_and_copy( } /// Try to re-execute the stage without committing -fn dry_run( +fn dry_run( output_provider_factory: ProviderFactory, to: u64, from: u64, executor: E, + consensus: C, ) -> eyre::Result<()> where N: ProviderNodeTypes, E: BlockExecutorProvider, + C: FullConsensus + 'static, { info!(target: "reth::cli", "Executing stage. [dry-run]"); - let mut exec_stage = ExecutionStage::new_with_executor(executor); + let mut exec_stage = ExecutionStage::new_with_executor(executor, Arc::new(consensus)); let input = reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) }; diff --git a/crates/cli/commands/src/stage/dump/merkle.rs b/crates/cli/commands/src/stage/dump/merkle.rs index db83de7c0..e1eb24180 100644 --- a/crates/cli/commands/src/stage/dump/merkle.rs +++ b/crates/cli/commands/src/stage/dump/merkle.rs @@ -4,6 +4,7 @@ use super::setup; use alloy_primitives::BlockNumber; use eyre::Result; use reth_config::config::EtlConfig; +use reth_consensus::noop::NoopConsensus; use reth_db::{tables, DatabaseEnv}; use reth_db_api::{database::Database, table::TableImporter}; use reth_db_common::DbTool; @@ -96,6 +97,7 @@ fn unwind_and_copy( // Bring Plainstate to TO (hashing stage execution requires it) let mut exec_stage = ExecutionStage::new( NoopBlockExecutorProvider::::default(), // Not necessary for unwinding. + NoopConsensus::arc(), ExecutionStageThresholds { max_blocks: Some(u64::MAX), max_changes: None, diff --git a/crates/cli/commands/src/stage/dump/mod.rs b/crates/cli/commands/src/stage/dump/mod.rs index ff5ac60f5..6593181c4 100644 --- a/crates/cli/commands/src/stage/dump/mod.rs +++ b/crates/cli/commands/src/stage/dump/mod.rs @@ -1,5 +1,5 @@ //! Database debugging tool -use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs}; +use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs}; use clap::Parser; use reth_chainspec::{EthChainSpec, EthereumHardforks}; use reth_cli::chainspec::ChainSpecParser; @@ -9,7 +9,6 @@ use reth_db_api::{ transaction::DbTx, }; use reth_db_common::DbTool; -use reth_evm::execute::BlockExecutorProvider; use reth_node_builder::NodeTypesWithDB; use reth_node_core::{ args::DatadirArgs, @@ -80,29 +79,31 @@ macro_rules! handle_stage { $stage_fn($tool, *from, *to, output_datadir, *dry_run).await? }}; - ($stage_fn:ident, $tool:expr, $command:expr, $executor:expr) => {{ + ($stage_fn:ident, $tool:expr, $command:expr, $executor:expr, $consensus:expr) => {{ let StageCommand { output_datadir, from, to, dry_run, .. } = $command; let output_datadir = output_datadir.with_chain($tool.chain().chain(), DatadirArgs::default()); - $stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor).await? + $stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor, $consensus).await? }}; } impl> Command { /// Execute `dump-stage` command - pub async fn execute(self, executor: F) -> eyre::Result<()> + pub async fn execute(self, components: F) -> eyre::Result<()> where N: CliNodeTypes, - E: BlockExecutorProvider, - F: FnOnce(Arc) -> E, + Comp: CliNodeComponents, + F: FnOnce(Arc) -> Comp, { let Environment { provider_factory, .. } = self.env.init::(AccessRights::RO)?; let tool = DbTool::new(provider_factory)?; match &self.command { Stages::Execution(cmd) => { - let executor = executor(tool.chain()); - handle_stage!(dump_execution_stage, &tool, cmd, executor) + let components = components(tool.chain()); + let executor = components.executor().clone(); + let consensus = components.consensus().clone(); + handle_stage!(dump_execution_stage, &tool, cmd, executor, consensus) } Stages::StorageHashing(cmd) => handle_stage!(dump_hashing_storage_stage, &tool, cmd), Stages::AccountHashing(cmd) => handle_stage!(dump_hashing_account_stage, &tool, cmd), diff --git a/crates/cli/commands/src/stage/mod.rs b/crates/cli/commands/src/stage/mod.rs index e9b2123bc..4450a698d 100644 --- a/crates/cli/commands/src/stage/mod.rs +++ b/crates/cli/commands/src/stage/mod.rs @@ -2,13 +2,12 @@ use std::sync::Arc; -use crate::common::CliNodeTypes; +use crate::common::{CliNodeComponents, CliNodeTypes}; use clap::{Parser, Subcommand}; use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks}; use reth_cli::chainspec::ChainSpecParser; use reth_cli_runner::CliContext; use reth_eth_wire::NetPrimitivesFor; -use reth_evm::execute::BlockExecutorProvider; pub mod drop; pub mod dump; @@ -42,17 +41,17 @@ pub enum Subcommands { impl> Command { /// Execute `stage` command - pub async fn execute(self, ctx: CliContext, executor: F) -> eyre::Result<()> + pub async fn execute(self, ctx: CliContext, components: F) -> eyre::Result<()> where N: CliNodeTypes, - E: BlockExecutorProvider, - F: FnOnce(Arc) -> E, + Comp: CliNodeComponents, + F: FnOnce(Arc) -> Comp, P: NetPrimitivesFor, { match self.command { - Subcommands::Run(command) => command.execute::(ctx, executor).await, + Subcommands::Run(command) => command.execute::(ctx, components).await, Subcommands::Drop(command) => command.execute::().await, - Subcommands::Dump(command) => command.execute::(executor).await, + Subcommands::Dump(command) => command.execute::(components).await, Subcommands::Unwind(command) => command.execute::().await, } } diff --git a/crates/cli/commands/src/stage/run.rs b/crates/cli/commands/src/stage/run.rs index fd921e053..8cfc242ed 100644 --- a/crates/cli/commands/src/stage/run.rs +++ b/crates/cli/commands/src/stage/run.rs @@ -2,7 +2,7 @@ //! //! Stage debugging tool -use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs}; +use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs}; use alloy_eips::BlockHashOrNumber; use alloy_primitives::Sealable; use clap::Parser; @@ -17,8 +17,6 @@ use reth_downloaders::{ headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_eth_wire::NetPrimitivesFor; -use reth_ethereum_consensus::EthBeaconConsensus; -use reth_evm::execute::BlockExecutorProvider; use reth_exex::ExExManagerHandle; use reth_network::BlockDownloaderProvider; use reth_network_p2p::HeadersClient; @@ -105,11 +103,11 @@ pub struct Command { impl> Command { /// Execute `stage` command - pub async fn execute(self, ctx: CliContext, executor: F) -> eyre::Result<()> + pub async fn execute(self, ctx: CliContext, components: F) -> eyre::Result<()> where N: CliNodeTypes, - E: BlockExecutorProvider, - F: FnOnce(Arc) -> E, + Comp: CliNodeComponents, + F: FnOnce(Arc) -> Comp, P: NetPrimitivesFor, { // Raise the fd limit of the process. @@ -120,6 +118,7 @@ impl self.env.init::(AccessRights::RW)?; let mut provider_rw = provider_factory.database_provider_rw()?; + let components = components(provider_factory.chain_spec()); if let Some(listen_addr) = self.metrics { info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr); @@ -162,8 +161,7 @@ impl let (mut exec_stage, mut unwind_stage): (Box>, Option>>) = match self.stage { StageEnum::Headers => { - let consensus = - Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec())); + let consensus = Arc::new(components.consensus().clone()); let network_secret_path = self .network @@ -215,8 +213,7 @@ impl ) } StageEnum::Bodies => { - let consensus = - Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec())); + let consensus = Arc::new(components.consensus().clone()); let mut config = config; config.peers.trusted_nodes_only = self.network.trusted_only; @@ -267,7 +264,8 @@ impl ), StageEnum::Execution => ( Box::new(ExecutionStage::new( - executor(provider_factory.chain_spec()), + components.executor().clone(), + Arc::new(components.consensus().clone()), ExecutionStageThresholds { max_blocks: Some(batch_size), max_changes: None, diff --git a/crates/cli/commands/src/stage/unwind.rs b/crates/cli/commands/src/stage/unwind.rs index a8f1cebea..74d612626 100644 --- a/crates/cli/commands/src/stage/unwind.rs +++ b/crates/cli/commands/src/stage/unwind.rs @@ -115,9 +115,14 @@ impl> Command let builder = if self.offline { Pipeline::::builder().add_stages( - OfflineStages::new(executor, config.stages, prune_modes.clone()) - .builder() - .disable(reth_stages::StageId::SenderRecovery), + OfflineStages::new( + executor, + NoopConsensus::arc(), + config.stages, + prune_modes.clone(), + ) + .builder() + .disable(reth_stages::StageId::SenderRecovery), ) } else { Pipeline::::builder().with_tip_sender(tip_tx).add_stages( @@ -133,6 +138,7 @@ impl> Command ) .set(ExecutionStage::new( executor, + Arc::new(NoopConsensus::default()), ExecutionStageThresholds { max_blocks: None, max_changes: None, diff --git a/crates/ethereum/consensus/src/lib.rs b/crates/ethereum/consensus/src/lib.rs index 904185c9c..7bdf24133 100644 --- a/crates/ethereum/consensus/src/lib.rs +++ b/crates/ethereum/consensus/src/lib.rs @@ -21,7 +21,7 @@ use reth_consensus_common::validation::{ validate_against_parent_timestamp, validate_block_pre_execution, validate_body_against_header, validate_header_base_fee, validate_header_extra_data, validate_header_gas, }; -use reth_primitives::{NodePrimitives, Receipt, RecoveredBlock, SealedBlock, SealedHeader}; +use reth_primitives::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader}; use reth_primitives_traits::{ constants::{GAS_LIMIT_BOUND_DIVISOR, MINIMUM_GAS_LIMIT}, Block, BlockHeader, @@ -99,7 +99,7 @@ impl EthBeaconConsensus impl FullConsensus for EthBeaconConsensus where ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug, - N: NodePrimitives, + N: NodePrimitives, { fn validate_block_post_execution( &self, diff --git a/crates/ethereum/consensus/src/validation.rs b/crates/ethereum/consensus/src/validation.rs index 8d6d38549..69145b9d3 100644 --- a/crates/ethereum/consensus/src/validation.rs +++ b/crates/ethereum/consensus/src/validation.rs @@ -3,26 +3,27 @@ use alloy_eips::eip7685::Requests; use alloy_primitives::{Bloom, B256}; use reth_chainspec::EthereumHardforks; use reth_consensus::ConsensusError; -use reth_primitives::{gas_spent_by_transactions, GotExpected, Receipt, RecoveredBlock}; -use reth_primitives_traits::Block; +use reth_primitives::{gas_spent_by_transactions, GotExpected, RecoveredBlock}; +use reth_primitives_traits::{Block, Receipt}; /// Validate a block with regard to execution results: /// /// - Compares the receipts root in the block header to the block body /// - Compares the gas used in the block header to the actual gas usage after execution -pub fn validate_block_post_execution( +pub fn validate_block_post_execution( block: &RecoveredBlock, chain_spec: &ChainSpec, - receipts: &[Receipt], + receipts: &[R], requests: &Requests, ) -> Result<(), ConsensusError> where B: Block, + R: Receipt, ChainSpec: EthereumHardforks, { // Check if gas used matches the value set in header. let cumulative_gas_used = - receipts.last().map(|receipt| receipt.cumulative_gas_used).unwrap_or(0); + receipts.last().map(|receipt| receipt.cumulative_gas_used()).unwrap_or(0); if block.header().gas_used() != cumulative_gas_used { return Err(ConsensusError::BlockGasUsed { gas: GotExpected { got: cumulative_gas_used, expected: block.header().gas_used() }, @@ -61,13 +62,13 @@ where /// Calculate the receipts root, and compare it against against the expected receipts root and logs /// bloom. -fn verify_receipts( +fn verify_receipts( expected_receipts_root: B256, expected_logs_bloom: Bloom, - receipts: &[Receipt], + receipts: &[R], ) -> Result<(), ConsensusError> { // Calculate receipts root. - let receipts_with_bloom = receipts.iter().map(Receipt::with_bloom_ref).collect::>(); + let receipts_with_bloom = receipts.iter().map(TxReceipt::with_bloom_ref).collect::>(); let receipts_root = calculate_receipt_root(&receipts_with_bloom); // Calculate header logs bloom. @@ -109,6 +110,7 @@ fn compare_receipts_root_and_logs_bloom( #[cfg(test)] mod tests { use alloy_primitives::hex; + use reth_primitives::Receipt; use super::*; diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index d970e421c..29f51231f 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -291,10 +291,8 @@ mod tests { }; use alloy_primitives::{b256, fixed_bytes, keccak256, Bytes, TxKind, B256, U256}; use reth_chainspec::{ChainSpecBuilder, ForkCondition}; - use reth_evm::execute::{ - BasicBlockExecutorProvider, BatchExecutor, BlockExecutorProvider, Executor, - }; - use reth_execution_types::BlockExecutionOutput; + use reth_evm::execute::{BasicBlockExecutorProvider, BlockExecutorProvider, Executor}; + use reth_execution_types::BlockExecutionResult; use reth_primitives::{Account, Block, BlockBody, Transaction}; use reth_primitives_traits::{crypto::secp256k1::public_key_to_address, Block as _}; use reth_revm::{ @@ -368,11 +366,11 @@ mod tests { let provider = executor_provider(chain_spec); - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // attempt to execute a block without parent beacon block root, expect err let err = executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header: header.clone(), body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, @@ -393,7 +391,7 @@ mod tests { // Now execute a block with the fixed header, ensure that it does not fail executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header: header.clone(), body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, @@ -452,8 +450,8 @@ mod tests { // attempt to execute an empty block with parent beacon block root, this should not fail provider - .batch_executor(StateProviderDatabase::new(&db)) - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .executor(StateProviderDatabase::new(&db)) + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, @@ -493,11 +491,11 @@ mod tests { ..Header::default() }; - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // attempt to execute an empty block with parent beacon block root, this should not fail executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, @@ -528,12 +526,12 @@ mod tests { let mut header = chain_spec.genesis_header().clone(); let provider = executor_provider(chain_spec); - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // attempt to execute the genesis block with non-zero parent beacon block root, expect err header.parent_beacon_block_root = Some(B256::with_last_byte(0x69)); let _err = executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header: header.clone(), body: Default::default() }, vec![], )) @@ -548,7 +546,7 @@ mod tests { // now try to process the genesis block again, this time ensuring that a system contract // call does not occur executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: Default::default() }, vec![], )) @@ -592,11 +590,11 @@ mod tests { let provider = executor_provider(chain_spec); // execute header - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // Now execute a block with the fixed header, ensure that it does not fail executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header: header.clone(), body: Default::default() }, vec![], )) @@ -659,14 +657,14 @@ mod tests { ); let provider = executor_provider(chain_spec); - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // construct the header for block one let header = Header { timestamp: 1, number: 1, ..Header::default() }; // attempt to execute an empty block, this should not fail executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: Default::default() }, vec![], )) @@ -700,11 +698,11 @@ mod tests { let header = chain_spec.genesis_header().clone(); let provider = executor_provider(chain_spec); - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // attempt to execute genesis block, this should not fail executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: Default::default() }, vec![], )) @@ -747,11 +745,11 @@ mod tests { ..Header::default() }; let provider = executor_provider(chain_spec); - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // attempt to execute the fork activation block, this should not fail executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: Default::default() }, vec![], )) @@ -791,7 +789,7 @@ mod tests { ); let provider = executor_provider(chain_spec); - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); let header = Header { parent_hash: B256::random(), @@ -805,7 +803,7 @@ mod tests { // attempt to execute the fork activation block, this should not fail executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: Default::default() }, vec![], )) @@ -834,11 +832,11 @@ mod tests { let header_hash = header.hash_slow(); let provider = executor_provider(chain_spec); - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // attempt to execute the genesis block, this should not fail executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: Default::default() }, vec![], )) @@ -869,7 +867,7 @@ mod tests { let header_hash = header.hash_slow(); executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: Default::default() }, vec![], )) @@ -903,7 +901,7 @@ mod tests { }; executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + .execute_one(&RecoveredBlock::new_unhashed( Block { header, body: Default::default() }, vec![], )) @@ -984,10 +982,10 @@ mod tests { let provider = executor_provider(chain_spec); - let executor = provider.executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); - let BlockExecutionOutput { receipts, requests, .. } = executor - .execute( + let BlockExecutionResult { receipts, requests, .. } = executor + .execute_one( &Block { header, body: BlockBody { transactions: vec![tx], ..Default::default() } } .try_into_recovered() .unwrap(), @@ -1060,10 +1058,10 @@ mod tests { ); // Create an executor from the state provider - let executor = executor_provider(chain_spec).executor(StateProviderDatabase::new(&db)); + let mut executor = executor_provider(chain_spec).executor(StateProviderDatabase::new(&db)); // Execute the block and capture the result - let exec_result = executor.execute( + let exec_result = executor.execute_one( &Block { header, body: BlockBody { transactions: vec![tx], ..Default::default() } } .try_into_recovered() .unwrap(), diff --git a/crates/evm/execution-types/src/execution_outcome.rs b/crates/evm/execution-types/src/execution_outcome.rs index 1f579e3fd..280cd457d 100644 --- a/crates/evm/execution-types/src/execution_outcome.rs +++ b/crates/evm/execution-types/src/execution_outcome.rs @@ -1,4 +1,4 @@ -use crate::BlockExecutionOutput; +use crate::{BlockExecutionOutput, BlockExecutionResult}; use alloc::{vec, vec::Vec}; use alloy_eips::eip7685::Requests; use alloy_primitives::{logs_bloom, map::HashMap, Address, BlockNumber, Bloom, Log, B256, U256}; @@ -128,6 +128,30 @@ impl ExecutionOutcome { Self { bundle, receipts, first_block, requests } } + /// Creates a new `ExecutionOutcome` from a single block execution result. + pub fn single(block_number: u64, result: BlockExecutionOutput) -> Self { + Self { + bundle: result.state, + receipts: vec![result.receipts], + first_block: block_number, + requests: vec![result.requests], + } + } + + /// Creates a new `ExecutionOutcome` from multiple [`BlockExecutionResult`]s. + pub fn from_blocks( + first_block: u64, + bundle: BundleState, + results: Vec>, + ) -> Self { + let mut value = Self { bundle, first_block, receipts: Vec::new(), requests: Vec::new() }; + for result in results { + value.receipts.push(result.receipts); + value.requests.push(result.requests); + } + value + } + /// Return revm bundle state. pub const fn state(&self) -> &BundleState { &self.bundle diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index b9a81232d..329437906 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -65,6 +65,30 @@ pub trait Executor: Sized { Ok(BlockExecutionOutput { state: state.take_bundle(), receipts, requests, gas_used }) } + /// Executes multiple inputs in the batch, and returns an aggregated [`ExecutionOutcome`]. + fn execute_batch<'a, I>( + mut self, + blocks: I, + ) -> Result::Receipt>, Self::Error> + where + I: IntoIterator::Block>>, + { + let mut results = Vec::new(); + let mut first_block = None; + for block in blocks { + if first_block.is_none() { + first_block = Some(block.header().number()); + } + results.push(self.execute_one(block)?); + } + + Ok(ExecutionOutcome::from_blocks( + first_block.unwrap_or_default(), + self.into_state().take_bundle(), + results, + )) + } + /// Executes the EVM with the given input and accepts a state closure that is invoked with /// the EVM state after execution. fn execute_with_state_closure( @@ -377,13 +401,10 @@ where F: OnStateHook + 'static, { self.strategy.with_state_hook(Some(Box::new(state_hook))); + let result = self.execute_one(block); + self.strategy.with_state_hook(None); - self.strategy.apply_pre_execution_changes(block)?; - let ExecuteOutput { receipts, gas_used } = self.strategy.execute_transactions(block)?; - let requests = self.strategy.apply_post_execution_changes(block, &receipts)?; - self.strategy.state_mut().merge_transitions(BundleRetention::Reverts); - - Ok(BlockExecutionResult { receipts, requests, gas_used }) + result } fn into_state(self) -> State { diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs index b32fe2ca1..d7069eb56 100644 --- a/crates/exex/exex/src/backfill/job.rs +++ b/crates/exex/exex/src/backfill/job.rs @@ -7,13 +7,14 @@ use std::{ use alloy_consensus::BlockHeader; use alloy_primitives::BlockNumber; use reth_evm::execute::{ - BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor, + BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor, }; use reth_node_api::{Block as _, BlockBody as _, NodePrimitives}; use reth_primitives::{Receipt, RecoveredBlock}; use reth_primitives_traits::{format_gas_throughput, SignedTransaction}; use reth_provider::{ - BlockReader, Chain, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant, + BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory, + TransactionVariant, }; use reth_prune_types::PruneModes; use reth_revm::database::StateProviderDatabase; @@ -75,7 +76,7 @@ where "Executing block range" ); - let mut executor = self.executor.batch_executor(StateProviderDatabase::new( + let mut executor = self.executor.executor(StateProviderDatabase::new( self.provider.history_by_block_number(self.range.start().saturating_sub(1))?, )); @@ -85,6 +86,7 @@ where let batch_start = Instant::now(); let mut blocks = Vec::new(); + let mut results = Vec::new(); for block_number in self.range.clone() { // Fetch the block let fetch_block_start = Instant::now(); @@ -110,19 +112,17 @@ where let (header, body) = block.split_sealed_header_body(); let block = P::Block::new_sealed(header, body).with_senders(senders); - executor.execute_and_verify_one(&block)?; + results.push(executor.execute_one(&block)?); execution_duration += execute_start.elapsed(); // TODO(alexey): report gas metrics using `block.header.gas_used` // Seal the block back and save it blocks.push(block); - // Check if we should commit now - let bundle_size_hint = executor.size_hint().unwrap_or_default() as u64; if self.thresholds.is_end_of_batch( block_number - *self.range.start(), - bundle_size_hint, + executor.size_hint() as u64, cumulative_gas, batch_start.elapsed(), ) { @@ -130,6 +130,7 @@ where } } + let first_block_number = blocks.first().expect("blocks should not be empty").number(); let last_block_number = blocks.last().expect("blocks should not be empty").number(); debug!( target: "exex::backfill", @@ -141,7 +142,12 @@ where ); self.range = last_block_number + 1..=*self.range.end(); - let chain = Chain::new(blocks, executor.finalize(), None); + let outcome = ExecutionOutcome::from_blocks( + first_block_number, + executor.into_state().take_bundle(), + results, + ); + let chain = Chain::new(blocks, outcome, None); Ok(chain) } } diff --git a/crates/exex/exex/src/backfill/test_utils.rs b/crates/exex/exex/src/backfill/test_utils.rs index a10a2785e..c849c0b3a 100644 --- a/crates/exex/exex/src/backfill/test_utils.rs +++ b/crates/exex/exex/src/backfill/test_utils.rs @@ -4,7 +4,7 @@ use alloy_consensus::{constants::ETH_TO_WEI, BlockHeader, Header, TxEip2930}; use alloy_genesis::{Genesis, GenesisAccount}; use alloy_primitives::{b256, Address, TxKind, U256}; use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET, MIN_TRANSACTION_GAS}; -use reth_evm::execute::{BatchExecutor, BlockExecutionOutput, BlockExecutorProvider, Executor}; +use reth_evm::execute::{BlockExecutionOutput, BlockExecutorProvider, Executor}; use reth_evm_ethereum::execute::EthExecutorProvider; use reth_node_api::FullNodePrimitives; use reth_primitives::{Block, BlockBody, Receipt, RecoveredBlock, Transaction}; @@ -195,9 +195,9 @@ where let provider = provider_factory.provider()?; let executor = EthExecutorProvider::ethereum(chain_spec) - .batch_executor(StateProviderDatabase::new(LatestStateProviderRef::new(&provider))); + .executor(StateProviderDatabase::new(LatestStateProviderRef::new(&provider))); - let mut execution_outcome = executor.execute_and_verify_batch(vec![&block1, &block2])?; + let mut execution_outcome = executor.execute_batch(vec![&block1, &block2])?; execution_outcome.state_mut().reverts.sort(); // Commit the block's execution outcome to the database diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 9df8521af..27a7cbed9 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use crate::BlockTy; use alloy_primitives::{BlockNumber, B256}; use reth_config::{config::StageConfig, PruneConfig}; -use reth_consensus::{Consensus, ConsensusError}; +use reth_consensus::{ConsensusError, FullConsensus}; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, @@ -28,7 +28,7 @@ use tokio::sync::watch; pub fn build_networked_pipeline( config: &StageConfig, client: Client, - consensus: Arc, Error = ConsensusError>>, + consensus: Arc>, provider_factory: ProviderFactory, task_executor: &TaskExecutor, metrics_tx: reth_stages::MetricEventsSender, @@ -49,7 +49,7 @@ where .into_task_with(task_executor); let body_downloader = BodiesDownloaderBuilder::new(config.bodies) - .build(client, Arc::clone(&consensus), provider_factory.clone()) + .build(client, consensus.clone().as_consensus(), provider_factory.clone()) .into_task_with(task_executor); let pipeline = build_pipeline( @@ -76,7 +76,7 @@ pub fn build_pipeline( stage_config: &StageConfig, header_downloader: H, body_downloader: B, - consensus: Arc, Error = ConsensusError>>, + consensus: Arc>, max_block: Option, metrics_tx: reth_stages::MetricEventsSender, prune_config: Option, @@ -117,6 +117,7 @@ where ) .set(ExecutionStage::new( executor, + consensus, stage_config.execution.into(), stage_config.execution_external_clean_threshold(), exex_manager_handle, diff --git a/crates/optimism/cli/Cargo.toml b/crates/optimism/cli/Cargo.toml index 5d45beb10..2d53e33f4 100644 --- a/crates/optimism/cli/Cargo.toml +++ b/crates/optimism/cli/Cargo.toml @@ -34,6 +34,7 @@ reth-node-metrics.workspace = true ## optimism reth-optimism-primitives.workspace = true reth-optimism-chainspec.workspace = true +reth-optimism-consensus.workspace = true reth-chainspec.workspace = true reth-node-events.workspace = true @@ -84,6 +85,7 @@ optimism = [ "reth-db-api/optimism", "reth-optimism-primitives/optimism", "reth-downloaders/optimism", + "reth-optimism-consensus/optimism", ] asm-keccak = [ "alloy-primitives/asm-keccak", diff --git a/crates/optimism/cli/src/lib.rs b/crates/optimism/cli/src/lib.rs index b047218df..21fb46b78 100644 --- a/crates/optimism/cli/src/lib.rs +++ b/crates/optimism/cli/src/lib.rs @@ -51,6 +51,7 @@ use reth_node_core::{ args::LogArgs, version::{LONG_VERSION, SHORT_VERSION}, }; +use reth_optimism_consensus::OpBeaconConsensus; use reth_optimism_evm::OpExecutorProvider; use reth_optimism_node::{OpNetworkPrimitives, OpNode}; use reth_tracing::FileWorkerGuard; @@ -169,8 +170,9 @@ where Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute::()), Commands::Stage(command) => runner.run_command_until_exit(|ctx| { - command - .execute::(ctx, OpExecutorProvider::optimism) + command.execute::(ctx, |spec| { + (OpExecutorProvider::optimism(spec.clone()), OpBeaconConsensus::new(spec)) + }) }), Commands::P2P(command) => { runner.run_until_ctrl_c(command.execute::()) diff --git a/crates/optimism/evm/src/execute.rs b/crates/optimism/evm/src/execute.rs index 302b3febb..e17f1d6a8 100644 --- a/crates/optimism/evm/src/execute.rs +++ b/crates/optimism/evm/src/execute.rs @@ -329,7 +329,7 @@ mod tests { }; use op_alloy_consensus::{OpTypedTransaction, TxDeposit}; use reth_chainspec::MIN_TRANSACTION_GAS; - use reth_evm::execute::{BasicBlockExecutorProvider, BatchExecutor, BlockExecutorProvider}; + use reth_evm::execute::{BasicBlockExecutorProvider, BlockExecutorProvider, Executor}; use reth_optimism_chainspec::OpChainSpecBuilder; use reth_optimism_primitives::{OpReceipt, OpTransactionSigned}; use reth_primitives_traits::Account; @@ -416,7 +416,7 @@ mod tests { ); let provider = executor_provider(chain_spec); - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // make sure the L1 block contract state is preloaded. executor.with_state_mut(|state| { @@ -424,8 +424,8 @@ mod tests { }); // Attempt to execute a block with one deposit and one non-deposit transaction - executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + let output = executor + .execute(&RecoveredBlock::new_unhashed( Block { header, body: BlockBody { transactions: vec![tx, tx_deposit], ..Default::default() }, @@ -434,9 +434,9 @@ mod tests { )) .unwrap(); - let receipts = executor.receipts(); - let tx_receipt = &receipts[0][0]; - let deposit_receipt = &receipts[0][1]; + let receipts = output.receipts; + let tx_receipt = &receipts[0]; + let deposit_receipt = &receipts[1]; assert!(!matches!(tx_receipt, OpReceipt::Deposit(_))); // deposit_nonce is present only in deposit transactions @@ -492,7 +492,7 @@ mod tests { ); let provider = executor_provider(chain_spec); - let mut executor = provider.batch_executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db)); // make sure the L1 block contract state is preloaded. executor.with_state_mut(|state| { @@ -500,8 +500,8 @@ mod tests { }); // attempt to execute an empty block with parent beacon block root, this should not fail - executor - .execute_and_verify_one(&RecoveredBlock::new_unhashed( + let output = executor + .execute(&RecoveredBlock::new_unhashed( Block { header, body: BlockBody { transactions: vec![tx, tx_deposit], ..Default::default() }, @@ -510,9 +510,9 @@ mod tests { )) .expect("Executing a block while canyon is active should not fail"); - let receipts = executor.receipts(); - let tx_receipt = &receipts[0][0]; - let deposit_receipt = &receipts[0][1]; + let receipts = output.receipts; + let tx_receipt = &receipts[0]; + let deposit_receipt = &receipts[1]; // deposit_receipt_version is set to 1 for post canyon deposit transactions assert!(!matches!(tx_receipt, OpReceipt::Deposit(_))); diff --git a/crates/stages/stages/Cargo.toml b/crates/stages/stages/Cargo.toml index 7b7a9532c..4cfa59850 100644 --- a/crates/stages/stages/Cargo.toml +++ b/crates/stages/stages/Cargo.toml @@ -66,6 +66,7 @@ reth-chainspec.workspace = true reth-primitives = { workspace = true, features = ["test-utils", "arbitrary"] } reth-db = { workspace = true, features = ["test-utils", "mdbx"] } reth-evm-ethereum.workspace = true +reth-ethereum-consensus.workspace = true reth-execution-errors.workspace = true reth-consensus = { workspace = true, features = ["test-utils"] } reth-network-p2p = { workspace = true, features = ["test-utils"] } diff --git a/crates/stages/stages/src/lib.rs b/crates/stages/stages/src/lib.rs index 76b88e391..0359537d1 100644 --- a/crates/stages/stages/src/lib.rs +++ b/crates/stages/stages/src/lib.rs @@ -32,9 +32,10 @@ //! # use reth_config::config::StageConfig; //! # use reth_consensus::{Consensus, ConsensusError}; //! # use reth_consensus::test_utils::TestConsensus; +//! # use reth_consensus::FullConsensus; //! # //! # let chain_spec = MAINNET.clone(); -//! # let consensus: Arc> = Arc::new(TestConsensus::default()); +//! # let consensus: Arc> = Arc::new(TestConsensus::default()); //! # let headers_downloader = ReverseHeadersDownloaderBuilder::default().build( //! # Arc::new(TestHeadersClient::default()), //! # consensus.clone().as_header_validator() @@ -42,7 +43,7 @@ //! # let provider_factory = create_test_provider_factory(); //! # let bodies_downloader = BodiesDownloaderBuilder::default().build( //! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::ZERO, vec![]).into()) }), -//! # consensus.clone(), +//! # consensus.clone().as_consensus(), //! # provider_factory.clone() //! # ); //! # let (tip_tx, tip_rx) = watch::channel(B256::default()); diff --git a/crates/stages/stages/src/sets.rs b/crates/stages/stages/src/sets.rs index 2f7be1791..8f9c9a768 100644 --- a/crates/stages/stages/src/sets.rs +++ b/crates/stages/stages/src/sets.rs @@ -21,15 +21,17 @@ //! # use reth_config::config::StageConfig; //! # use reth_evm::execute::BlockExecutorProvider; //! # use reth_primitives::EthPrimitives; +//! # use std::sync::Arc; +//! # use reth_consensus::{FullConsensus, ConsensusError}; //! -//! # fn create(exec: impl BlockExecutorProvider) { +//! # fn create(exec: impl BlockExecutorProvider, consensus: impl FullConsensus + 'static) { //! //! let provider_factory = create_test_provider_factory(); //! let static_file_producer = //! StaticFileProducer::new(provider_factory.clone(), PruneModes::default()); //! // Build a pipeline with all offline stages. //! let pipeline = Pipeline::::builder() -//! .add_stages(OfflineStages::new(exec, StageConfig::default(), PruneModes::default())) +//! .add_stages(OfflineStages::new(exec, Arc::new(consensus), StageConfig::default(), PruneModes::default())) //! .build(provider_factory, static_file_producer); //! //! # } @@ -44,9 +46,10 @@ use crate::{ }; use alloy_primitives::B256; use reth_config::config::StageConfig; -use reth_consensus::{Consensus, ConsensusError}; +use reth_consensus::{Consensus, ConsensusError, FullConsensus}; use reth_evm::execute::BlockExecutorProvider; use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}; +use reth_primitives::NodePrimitives; use reth_primitives_traits::Block; use reth_provider::HeaderSyncGapProvider; use reth_prune_types::PruneModes; @@ -78,15 +81,18 @@ use tokio::sync::watch; /// - [`PruneStage`] (execute) /// - [`FinishStage`] #[derive(Debug)] -pub struct DefaultStages +pub struct DefaultStages where H: HeaderDownloader, B: BodyDownloader, + E: BlockExecutorProvider, { /// Configuration for the online stages online: OnlineStages, /// Executor factory needs for execution stage - executor_factory: EF, + executor_provider: E, + /// Consensus instance + consensus: Arc>, /// Configuration for each stage in the pipeline stages_config: StageConfig, /// Prune configuration for every segment that can be pruned @@ -97,32 +103,31 @@ impl DefaultStages where H: HeaderDownloader, B: BodyDownloader, + E: BlockExecutorProvider>, { /// Create a new set of default stages with default values. #[allow(clippy::too_many_arguments)] pub fn new( provider: Provider, tip: watch::Receiver, - consensus: Arc>, + consensus: Arc>, header_downloader: H, body_downloader: B, - executor_factory: E, + executor_provider: E, stages_config: StageConfig, prune_modes: PruneModes, - ) -> Self - where - E: BlockExecutorProvider, - { + ) -> Self { Self { online: OnlineStages::new( provider, tip, - consensus, + consensus.clone().as_consensus(), header_downloader, body_downloader, stages_config.clone(), ), - executor_factory, + executor_provider, + consensus, stages_config, prune_modes, } @@ -138,7 +143,8 @@ where /// Appends the default offline stages and default finish stage to the given builder. pub fn add_offline_stages( default_offline: StageSetBuilder, - executor_factory: E, + executor_provider: E, + consensus: Arc>, stages_config: StageConfig, prune_modes: PruneModes, ) -> StageSetBuilder @@ -147,7 +153,7 @@ where { StageSetBuilder::default() .add_set(default_offline) - .add_set(OfflineStages::new(executor_factory, stages_config, prune_modes)) + .add_set(OfflineStages::new(executor_provider, consensus, stages_config, prune_modes)) .add_stage(FinishStage) } } @@ -164,7 +170,8 @@ where fn builder(self) -> StageSetBuilder { Self::add_offline_stages( self.online.builder(), - self.executor_factory, + self.executor_provider, + self.consensus, self.stages_config.clone(), self.prune_modes, ) @@ -286,25 +293,28 @@ where /// - [`HashingStages`] /// - [`HistoryIndexingStages`] /// - [`PruneStage`] -#[derive(Debug, Default)] +#[derive(Debug)] #[non_exhaustive] -pub struct OfflineStages { +pub struct OfflineStages { /// Executor factory needs for execution stage - executor_factory: EF, + executor_provider: E, + /// Consensus instance for validating blocks. + consensus: Arc>, /// Configuration for each stage in the pipeline stages_config: StageConfig, /// Prune configuration for every segment that can be pruned prune_modes: PruneModes, } -impl OfflineStages { +impl OfflineStages { /// Create a new set of offline stages with default values. pub const fn new( - executor_factory: EF, + executor_provider: E, + consensus: Arc>, stages_config: StageConfig, prune_modes: PruneModes, ) -> Self { - Self { executor_factory, stages_config, prune_modes } + Self { executor_provider, consensus, stages_config, prune_modes } } } @@ -318,7 +328,7 @@ where PruneStage: Stage, { fn builder(self) -> StageSetBuilder { - ExecutionStages::new(self.executor_factory, self.stages_config.clone()) + ExecutionStages::new(self.executor_provider, self.consensus, self.stages_config.clone()) .builder() // If sender recovery prune mode is set, add the prune sender recovery stage. .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| { @@ -341,17 +351,23 @@ where /// A set containing all stages that are required to execute pre-existing block data. #[derive(Debug)] #[non_exhaustive] -pub struct ExecutionStages { +pub struct ExecutionStages { /// Executor factory that will create executors. - executor_factory: E, + executor_provider: E, + /// Consensus instance for validating blocks. + consensus: Arc>, /// Configuration for each stage in the pipeline stages_config: StageConfig, } -impl ExecutionStages { +impl ExecutionStages { /// Create a new set of execution stages with default values. - pub const fn new(executor_factory: E, stages_config: StageConfig) -> Self { - Self { executor_factory, stages_config } + pub const fn new( + executor_provider: E, + consensus: Arc>, + stages_config: StageConfig, + ) -> Self { + Self { executor_provider, consensus, stages_config } } } @@ -365,7 +381,8 @@ where StageSetBuilder::default() .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery)) .add_stage(ExecutionStage::from_config( - self.executor_factory, + self.executor_provider, + self.consensus, self.stages_config.execution, self.stages_config.execution_external_clean_threshold(), )) diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index ddd811a29..a6c67bf55 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -4,9 +4,10 @@ use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_primitives::BlockNumber; use num_traits::Zero; use reth_config::config::ExecutionConfig; +use reth_consensus::{ConsensusError, FullConsensus, PostExecutionInput}; use reth_db::{static_file::HeaderMask, tables}; use reth_evm::{ - execute::{BatchExecutor, BlockExecutorProvider}, + execute::{BlockExecutorProvider, Executor}, metrics::ExecutorMetrics, }; use reth_execution_types::Chain; @@ -15,9 +16,9 @@ use reth_primitives::StaticFileSegment; use reth_primitives_traits::{format_gas_throughput, Block, BlockBody, NodePrimitives}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, - BlockHashReader, BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef, - OriginalValuesKnown, ProviderError, StateCommitmentProvider, StateWriter, - StaticFileProviderFactory, StatsReader, StorageLocation, TransactionVariant, + BlockHashReader, BlockReader, DBProvider, ExecutionOutcome, HeaderProvider, + LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateCommitmentProvider, + StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation, TransactionVariant, }; use reth_revm::database::StateProviderDatabase; use reth_stages_api::{ @@ -72,6 +73,9 @@ where { /// The stage's internal block executor executor_provider: E, + /// The consensus instance for validating blocks. + consensus: Arc>, + /// The consensu /// The commit thresholds of the execution stage. thresholds: ExecutionStageThresholds, /// The highest threshold (in number of blocks) for switching between incremental @@ -100,6 +104,7 @@ where /// Create new execution stage with specified config. pub fn new( executor_provider: E, + consensus: Arc>, thresholds: ExecutionStageThresholds, external_clean_threshold: u64, exex_manager_handle: ExExManagerHandle, @@ -107,6 +112,7 @@ where Self { external_clean_threshold, executor_provider, + consensus, thresholds, post_execute_commit_input: None, post_unwind_commit_input: None, @@ -118,9 +124,13 @@ where /// Create an execution stage with the provided executor. /// /// The commit threshold will be set to [`MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD`]. - pub fn new_with_executor(executor_provider: E) -> Self { + pub fn new_with_executor( + executor_provider: E, + consensus: Arc>, + ) -> Self { Self::new( executor_provider, + consensus, ExecutionStageThresholds::default(), MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, ExExManagerHandle::empty(), @@ -130,11 +140,13 @@ where /// Create new instance of [`ExecutionStage`] from configuration. pub fn from_config( executor_provider: E, + consensus: Arc>, config: ExecutionConfig, external_clean_threshold: u64, ) -> Self { Self::new( executor_provider, + consensus, config.into(), external_clean_threshold, ExExManagerHandle::empty(), @@ -283,7 +295,7 @@ where self.ensure_consistency(provider, input.checkpoint().block_number, None)?; let db = StateProviderDatabase(LatestStateProviderRef::new(provider)); - let mut executor = self.executor_provider.batch_executor(db); + let mut executor = self.executor_provider.executor(db); // Progress tracking let mut stage_progress = start_block; @@ -310,6 +322,7 @@ where let batch_start = Instant::now(); let mut blocks = Vec::new(); + let mut results = Vec::new(); for block_number in start_block..=max_block { // Fetch the block let fetch_block_start = Instant::now(); @@ -329,8 +342,8 @@ where // Execute the block let execute_start = Instant::now(); - self.metrics.metered_one(&block, |input| { - executor.execute_and_verify_one(input).map_err(|error| { + let result = self.metrics.metered_one(&block, |input| { + executor.execute_one(input).map_err(|error| { let header = block.header(); StageError::Block { block: Box::new(BlockWithParent::new( @@ -342,6 +355,20 @@ where }) })?; + if let Err(err) = self.consensus.validate_block_post_execution( + &block, + PostExecutionInput::new(&result.receipts, &result.requests), + ) { + return Err(StageError::Block { + block: Box::new(BlockWithParent::new( + block.header().parent_hash(), + NumHash::new(block.header().number(), block.hash_slow()), + )), + error: BlockErrorKind::Validation(err), + }) + } + results.push(result); + execution_duration += execute_start.elapsed(); // Log execution throughput @@ -369,10 +396,9 @@ where } // Check if we should commit now - let bundle_size_hint = executor.size_hint().unwrap_or_default() as u64; if self.thresholds.is_end_of_batch( block_number - start_block, - bundle_size_hint, + executor.size_hint() as u64, cumulative_gas, batch_start.elapsed(), ) { @@ -382,7 +408,11 @@ where // prepare execution output for writing let time = Instant::now(); - let mut state = executor.finalize(); + let mut state = ExecutionOutcome::from_blocks( + start_block, + executor.into_state().take_bundle(), + results, + ); let write_preparation_duration = time.elapsed(); // log the gas per second for the range we just executed @@ -649,6 +679,7 @@ mod tests { use reth_chainspec::ChainSpecBuilder; use reth_db::transaction::DbTx; use reth_db_api::{models::AccountBeforeTx, transaction::DbTxMut}; + use reth_ethereum_consensus::EthBeaconConsensus; use reth_evm::execute::BasicBlockExecutorProvider; use reth_evm_ethereum::execute::EthExecutionStrategyFactory; use reth_primitives::{Account, Bytecode, SealedBlock, StorageEntry}; @@ -666,8 +697,12 @@ mod tests { ChainSpecBuilder::mainnet().berlin_activated().build(), )); let executor_provider = BasicBlockExecutorProvider::new(strategy_factory); + let consensus = Arc::new(EthBeaconConsensus::new(Arc::new( + ChainSpecBuilder::mainnet().berlin_activated().build(), + ))); ExecutionStage::new( executor_provider, + consensus, ExecutionStageThresholds { max_blocks: Some(100), max_changes: None, diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index 2145ba94c..9a2fb6552 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -57,6 +57,7 @@ mod tests { table::Table, transaction::{DbTx, DbTxMut}, }; + use reth_ethereum_consensus::EthBeaconConsensus; use reth_evm_ethereum::execute::EthExecutorProvider; use reth_exex::ExExManagerHandle; use reth_primitives::{Account, Bytecode, SealedBlock, StaticFileSegment}; @@ -152,6 +153,9 @@ mod tests { EthExecutorProvider::ethereum(Arc::new( ChainSpecBuilder::mainnet().berlin_activated().build(), )), + Arc::new(EthBeaconConsensus::new(Arc::new( + ChainSpecBuilder::mainnet().berlin_activated().build(), + ))), ExecutionStageThresholds { max_blocks: Some(100), max_changes: None, diff --git a/testing/ef-tests/Cargo.toml b/testing/ef-tests/Cargo.toml index 2ca705830..d75dd7e5f 100644 --- a/testing/ef-tests/Cargo.toml +++ b/testing/ef-tests/Cargo.toml @@ -27,6 +27,7 @@ reth-db-api.workspace = true reth-provider = { workspace = true, features = ["test-utils"] } reth-stages.workspace = true reth-evm-ethereum.workspace = true +reth-ethereum-consensus.workspace = true reth-revm = { workspace = true, features = ["std"] } revm = { workspace = true, features = ["secp256k1", "blst", "c-kzg"] } diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 85e7eb9dc..7fb7cc73f 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -7,6 +7,7 @@ use crate::{ use alloy_rlp::Decodable; use rayon::iter::{ParallelBridge, ParallelIterator}; use reth_chainspec::ChainSpec; +use reth_ethereum_consensus::EthBeaconConsensus; use reth_primitives::{BlockBody, SealedBlock, StaticFileSegment}; use reth_provider::{ providers::StaticFileWriter, test_utils::create_test_provider_factory_with_chain_spec, @@ -126,7 +127,8 @@ impl Case for BlockchainTestCase { // Execute the execution stage using the EVM processor factory for the test case // network. let _ = ExecutionStage::new_with_executor( - reth_evm_ethereum::execute::EthExecutorProvider::ethereum(chain_spec), + reth_evm_ethereum::execute::EthExecutorProvider::ethereum(chain_spec.clone()), + Arc::new(EthBeaconConsensus::new(chain_spec)), ) .execute( &provider,