diff --git a/Cargo.lock b/Cargo.lock index 595da4031..2389ae8b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4369,6 +4369,7 @@ dependencies = [ "reth-db", "reth-discv4", "reth-downloaders", + "reth-executor", "reth-interfaces", "reth-net-nat", "reth-network", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index abe8a6e47..11328d60c 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -17,6 +17,7 @@ reth-stages = { path = "../../crates/stages"} reth-interfaces = { path = "../../crates/interfaces", features = ["test-utils"] } reth-transaction-pool = { path = "../../crates/transaction-pool", features = ["test-utils"] } reth-consensus = { path = "../../crates/consensus" } +reth-executor = { path = "../../crates/executor" } reth-rpc-engine-api = { path = "../../crates/rpc/rpc-engine-api" } reth-rpc-builder = { path = "../../crates/rpc/rpc-builder" } reth-rpc = { path = "../../crates/rpc/rpc" } diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index 967ea2fa1..a694ba200 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -27,7 +27,6 @@ use reth_staged_sync::{ use reth_stages::{ prelude::*, stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage}, - DefaultDB, }; use std::sync::Arc; use tracing::{debug, info}; @@ -141,6 +140,8 @@ impl ImportCommand { .build(file_client.clone(), consensus.clone(), db) .into_task(); + let factory = reth_executor::Factory::new(Arc::new(self.chain.clone())); + let mut pipeline = Pipeline::builder() .with_sync_state_updater(file_client) .add_stages( @@ -149,6 +150,7 @@ impl ImportCommand { header_downloader, body_downloader, NoopStatusUpdater::default(), + factory.clone(), ) .set(TotalDifficultyStage { chain_spec: self.chain.clone(), @@ -157,12 +159,7 @@ impl ImportCommand { .set(SenderRecoveryStage { commit_threshold: config.stages.sender_recovery.commit_threshold, }) - .set({ - let mut stage: ExecutionStage<'_, DefaultDB<'_>> = - ExecutionStage::from(self.chain.clone()); - stage.commit_threshold = config.stages.execution.commit_threshold; - stage - }), + .set(ExecutionStage::new(factory, config.stages.execution.commit_threshold)), ) .with_max_block(0) .build(); diff --git a/bin/reth/src/dump_stage/execution.rs b/bin/reth/src/dump_stage/execution.rs index 5a5ae04de..7e164d9eb 100644 --- a/bin/reth/src/dump_stage/execution.rs +++ b/bin/reth/src/dump_stage/execution.rs @@ -9,8 +9,8 @@ use reth_db::{ }; use reth_primitives::MAINNET; use reth_provider::Transaction; -use reth_stages::{stages::ExecutionStage, DefaultDB, Stage, StageId, UnwindInput}; -use std::ops::DerefMut; +use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput}; +use std::{ops::DerefMut, sync::Arc}; use tracing::info; pub(crate) async fn dump_execution_stage( @@ -97,7 +97,10 @@ async fn unwind_and_copy( output_db: &reth_db::mdbx::Env, ) -> eyre::Result<()> { let mut unwind_tx = Transaction::new(db_tool.db)?; - let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone()); + + let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new( + Arc::new(MAINNET.clone()), + )); exec_stage .unwind( @@ -126,7 +129,9 @@ async fn dry_run( info!(target: "reth::cli", "Executing stage. [dry-run]"); let mut tx = Transaction::new(&output_db)?; - let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone()); + let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new( + Arc::new(MAINNET.clone()), + )); exec_stage .execute( diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs index 8e4b2b561..a9e8a8f62 100644 --- a/bin/reth/src/dump_stage/merkle.rs +++ b/bin/reth/src/dump_stage/merkle.rs @@ -9,9 +9,9 @@ use reth_primitives::MAINNET; use reth_provider::Transaction; use reth_stages::{ stages::{AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage}, - DefaultDB, Stage, StageId, UnwindInput, + Stage, StageId, UnwindInput, }; -use std::ops::DerefMut; +use std::{ops::DerefMut, sync::Arc}; use tracing::info; pub(crate) async fn dump_merkle_stage( @@ -75,7 +75,10 @@ async fn unwind_and_copy( MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?; // Bring Plainstate to TO (hashing stage execution requires it) - let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone()); + let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new( + Arc::new(MAINNET.clone()), + )); + exec_stage.commit_threshold = u64::MAX; exec_stage .unwind( diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 5852caec6..1cd2a6b2a 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -50,7 +50,6 @@ use reth_staged_sync::{ use reth_stages::{ prelude::*, stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH}, - DefaultDB, }; use reth_tasks::TaskExecutor; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; @@ -444,23 +443,25 @@ impl Command { builder = builder.with_max_block(max_block) } + let factory = reth_executor::Factory::new(Arc::new(self.chain.clone())); let pipeline = builder .with_sync_state_updater(updater.clone()) .add_stages( - DefaultStages::new(consensus.clone(), header_downloader, body_downloader, updater) - .set(TotalDifficultyStage { - chain_spec: self.chain.clone(), - commit_threshold: stage_conf.total_difficulty.commit_threshold, - }) - .set(SenderRecoveryStage { - commit_threshold: stage_conf.sender_recovery.commit_threshold, - }) - .set({ - let mut stage: ExecutionStage<'_, DefaultDB<'_>> = - ExecutionStage::from(self.chain.clone()); - stage.commit_threshold = stage_conf.execution.commit_threshold; - stage - }), + DefaultStages::new( + consensus.clone(), + header_downloader, + body_downloader, + updater, + factory.clone(), + ) + .set(TotalDifficultyStage { + chain_spec: self.chain.clone(), + commit_threshold: stage_conf.total_difficulty.commit_threshold, + }) + .set(SenderRecoveryStage { + commit_threshold: stage_conf.sender_recovery.commit_threshold, + }) + .set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)), ) .build(); diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index 27f82e25d..5c9ab23fd 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -17,7 +17,7 @@ use reth_staged_sync::{ }; use reth_stages::{ stages::{BodyStage, ExecutionStage, SenderRecoveryStage}, - DefaultDB, ExecInput, Stage, StageId, UnwindInput, + ExecInput, Stage, StageId, UnwindInput, }; use std::{net::SocketAddr, sync::Arc}; use tracing::*; @@ -171,7 +171,8 @@ impl Command { stage.execute(&mut tx, input).await?; } StageEnum::Execution => { - let mut stage = ExecutionStage::>::from(self.chain.clone()); + let factory = reth_executor::Factory::new(Arc::new(self.chain.clone())); + let mut stage = ExecutionStage::new(factory, 10_000); stage.commit_threshold = num_blocks; if !self.skip_unwind { stage.unwind(&mut tx, unwind).await?; diff --git a/bin/reth/src/test_eth_chain/runner.rs b/bin/reth/src/test_eth_chain/runner.rs index 2e1b2f992..cae43ba4e 100644 --- a/bin/reth/src/test_eth_chain/runner.rs +++ b/bin/reth/src/test_eth_chain/runner.rs @@ -15,11 +15,12 @@ use reth_primitives::{ }; use reth_provider::Transaction; use reth_rlp::Decodable; -use reth_stages::{stages::ExecutionStage, DefaultDB, ExecInput, Stage, StageId}; +use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId}; use std::{ collections::HashMap, ffi::OsStr, path::{Path, PathBuf}, + sync::Arc, }; use tracing::{debug, trace}; @@ -193,7 +194,8 @@ pub async fn run_test(path: PathBuf) -> eyre::Result { // Initialize the execution stage // Hardcode the chain_id to Ethereum 1. - let mut stage = ExecutionStage::>::from(chain_spec); + let factory = reth_executor::Factory::new(Arc::new(chain_spec)); + let mut stage = ExecutionStage::new(factory, 1_000); // Call execution stage let input = ExecInput { diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 9ce50bc0a..03d28ee9c 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -1,14 +1,12 @@ -use reth_interfaces::executor::{BlockExecutor, Error}; +use crate::execution_result::{ + AccountChangeSet, AccountInfoChangeSet, ExecutionResult, TransactionChangeSet, +}; +use reth_interfaces::executor::Error; use reth_primitives::{ bloom::logs_bloom, Account, Address, Block, Bloom, ChainSpec, Hardfork, Header, Log, Receipt, TransactionSigned, H256, U256, }; -use reth_provider::{ - execution_result::{ - AccountChangeSet, AccountInfoChangeSet, ExecutionResult, TransactionChangeSet, - }, - StateProvider, -}; +use reth_provider::{BlockExecutor, StateProvider}; use reth_revm::{ config::{WEI_2ETH, WEI_3ETH, WEI_5ETH}, database::SubState, @@ -30,38 +28,34 @@ use std::{ }; /// Main block executor -pub struct Executor<'a, DB> +pub struct Executor where DB: StateProvider, { /// The configured chain-spec pub chain_spec: Arc, - evm: EVM<&'a mut SubState>, + evm: EVM>, stack: InspectorStack, } -impl<'a, DB> From for Executor<'a, DB> +impl From> for Executor where DB: StateProvider, { /// Instantiates a new executor from the chainspec. Must call /// `with_db` to set the database before executing. - fn from(chain_spec: ChainSpec) -> Self { + fn from(chain_spec: Arc) -> Self { let evm = EVM::new(); - Executor { - chain_spec: Arc::new(chain_spec), - evm, - stack: InspectorStack::new(InspectorStackConfig::default()), - } + Executor { chain_spec, evm, stack: InspectorStack::new(InspectorStackConfig::default()) } } } -impl<'a, DB> Executor<'a, DB> +impl Executor where DB: StateProvider, { /// Creates a new executor from the given chain spec and database. - pub fn new(chain_spec: Arc, db: &'a mut SubState) -> Self { + pub fn new(chain_spec: Arc, db: SubState) -> Self { let mut evm = EVM::new(); evm.database(db); @@ -79,16 +73,6 @@ where self.evm.db().expect("db to not be moved") } - /// Overrides the database - pub fn with_db( - &self, - db: &'a mut SubState, - ) -> Executor<'a, OtherDB> { - let mut evm = EVM::new(); - evm.database(db); - Executor { chain_spec: self.chain_spec.clone(), evm, stack: self.stack.clone() } - } - fn recover_senders( &self, body: &[TransactionSigned], @@ -376,30 +360,6 @@ where } } - /// Execute and verify block - pub fn execute_and_verify_receipt( - &mut self, - block: &Block, - total_difficulty: U256, - senders: Option>, - ) -> Result { - let execution_result = self.execute(block, total_difficulty, senders)?; - - let receipts_iter = - execution_result.tx_changesets.iter().map(|changeset| &changeset.receipt); - - if self.chain_spec.fork(Hardfork::Byzantium).active_at_block(block.header.number) { - verify_receipt(block.header.receipts_root, block.header.logs_bloom, receipts_iter)?; - } - - // TODO Before Byzantium, receipts contained state root that would mean that expensive - // operation as hashing that is needed for state root got calculated in every - // transaction This was replaced with is_success flag. - // See more about EIP here: https://eips.ethereum.org/EIPS/eip-658 - - Ok(execution_result) - } - /// Runs a single transaction in the configured environment and proceeds /// to return the result and state diff (without applying it). /// @@ -487,7 +447,7 @@ where } } -impl<'a, DB> BlockExecutor for Executor<'a, DB> +impl BlockExecutor for Executor where DB: StateProvider, { @@ -521,6 +481,29 @@ where Ok(ExecutionResult { tx_changesets, block_changesets }) } + + fn execute_and_verify_receipt( + &mut self, + block: &Block, + total_difficulty: U256, + senders: Option>, + ) -> Result { + let execution_result = self.execute(block, total_difficulty, senders)?; + + let receipts_iter = + execution_result.tx_changesets.iter().map(|changeset| &changeset.receipt); + + if self.chain_spec.fork(Hardfork::Byzantium).active_at_block(block.header.number) { + verify_receipt(block.header.receipts_root, block.header.logs_bloom, receipts_iter)?; + } + + // TODO Before Byzantium, receipts contained state root that would mean that expensive + // operation as hashing that is needed for state root got calculated in every + // transaction This was replaced with is_success flag. + // See more about EIP here: https://eips.ethereum.org/EIPS/eip-658 + + Ok(execution_result) + } } /// Verify receipts @@ -661,10 +644,10 @@ mod tests { // spec at berlin fork let chain_spec = Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build()); - let mut db = SubState::new(State::new(db)); + let db = SubState::new(State::new(db)); // execute chain and verify receipts - let mut executor = Executor::new(chain_spec, &mut db); + let mut executor = Executor::new(chain_spec, db); let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap(); assert_eq!(out.tx_changesets.len(), 1, "Should executed one transaction"); @@ -689,6 +672,7 @@ mod tests { // Check if cache is set // account1 + let db = executor.db(); let cached_acc1 = db.accounts.get(&account1).unwrap(); assert_eq!(cached_acc1.info.balance, account1_info.balance); assert_eq!(cached_acc1.info.nonce, account1_info.nonce); @@ -791,9 +775,9 @@ mod tests { .build(), ); - let mut db = SubState::new(State::new(db)); + let db = SubState::new(State::new(db)); // execute chain and verify receipts - let mut executor = Executor::new(chain_spec, &mut db); + let mut executor = Executor::new(chain_spec, db); let out = executor .execute_and_verify_receipt( &Block { header, body: vec![], ommers: vec![], withdrawals: None }, @@ -805,6 +789,7 @@ mod tests { // Check if cache is set // beneficiary + let db = executor.db(); let dao_beneficiary = db.accounts.get(&crate::eth_dao_fork::DAO_HARDFORK_BENEFICIARY).unwrap(); @@ -881,10 +866,10 @@ mod tests { // spec at berlin fork let chain_spec = Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build()); - let mut db = SubState::new(State::new(db)); + let db = SubState::new(State::new(db)); // execute chain and verify receipts - let mut executor = Executor::new(chain_spec, &mut db); + let mut executor = Executor::new(chain_spec, db); let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap(); assert_eq!(out.tx_changesets.len(), 1, "Should executed one transaction"); @@ -930,10 +915,10 @@ mod tests { // spec at shanghai fork let chain_spec = Arc::new(ChainSpecBuilder::mainnet().shanghai_activated().build()); - let mut db = SubState::new(State::new(StateProviderTest::default())); + let db = SubState::new(State::new(StateProviderTest::default())); // execute chain and verify receipts - let mut executor = Executor::new(chain_spec, &mut db); + let mut executor = Executor::new(chain_spec, db); let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap(); assert_eq!(out.tx_changesets.len(), 0, "No tx"); diff --git a/crates/executor/src/factory.rs b/crates/executor/src/factory.rs new file mode 100644 index 000000000..5bd59d029 --- /dev/null +++ b/crates/executor/src/factory.rs @@ -0,0 +1,34 @@ +use reth_primitives::ChainSpec; +use reth_provider::{ExecutorFactory, StateProvider}; +use reth_revm::database::{State, SubState}; + +use crate::executor::Executor; +use std::sync::Arc; + +/// Factory that spawn Executor. +#[derive(Clone, Debug)] +pub struct Factory { + chain_spec: Arc, +} + +impl Factory { + /// Create new factory + pub fn new(chain_spec: Arc) -> Self { + Self { chain_spec } + } +} + +impl ExecutorFactory for Factory { + type Executor = Executor; + + /// Executor with [`StateProvider`] + fn with_sp(&self, sp: SP) -> Self::Executor { + let substate = SubState::new(State::new(sp)); + Executor::new(self.chain_spec.clone(), substate) + } + + /// Return internal chainspec + fn chain_spec(&self) -> &ChainSpec { + self.chain_spec.as_ref() + } +} diff --git a/crates/executor/src/lib.rs b/crates/executor/src/lib.rs index 6d28d9648..4bf17e8dc 100644 --- a/crates/executor/src/lib.rs +++ b/crates/executor/src/lib.rs @@ -13,3 +13,7 @@ pub mod eth_dao_fork; pub use reth_provider::execution_result; /// Executor pub mod executor; + +/// ExecutorFactory impl +pub mod factory; +pub use factory::Factory; diff --git a/crates/interfaces/src/executor.rs b/crates/interfaces/src/executor.rs index 489c00810..0165708da 100644 --- a/crates/interfaces/src/executor.rs +++ b/crates/interfaces/src/executor.rs @@ -1,26 +1,6 @@ -use async_trait::async_trait; -use reth_primitives::{Address, Block, Bloom, H256, U256}; +use reth_primitives::{Bloom, H256}; use thiserror::Error; -/// An executor capable of executing a block. -#[async_trait] -pub trait BlockExecutor { - /// Execute a block. - /// - /// The number of `senders` should be equal to the number of transactions in the block. - /// - /// If no senders are specified, the `execute` function MUST recover the senders for the - /// provided block's transactions internally. We use this to allow for calculating senders in - /// parallel in e.g. staged sync, so that execution can happen without paying for sender - /// recovery costs. - fn execute( - &mut self, - block: &Block, - total_difficulty: U256, - senders: Option>, - ) -> Result; -} - /// BlockExecutor Errors #[allow(missing_docs)] #[derive(Error, Debug, Clone, PartialEq, Eq)] diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index d52daa303..cc91f293d 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -6,8 +6,10 @@ use reth_primitives::{ BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, TransactionSigned, H64, U256, }; -use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; -use reth_revm::database::{State, SubState}; +use reth_provider::{ + BlockExecutor, BlockProvider, EvmEnvProvider, ExecutorFactory, HeaderProvider, + StateProviderFactory, +}; use reth_rlp::Decodable; use reth_rpc_types::engine::{ ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, @@ -305,8 +307,8 @@ impl Ok(PayloadStatus::new(PayloadStatusEnum::Valid, block_hash)), Err(err) => Ok(PayloadStatus::new( diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 3a50de8a2..f08754b58 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -17,7 +17,6 @@ normal = [ # reth libs reth-primitives = { path = "../primitives" } reth-interfaces = { path = "../interfaces" } -reth-executor = { path = "../executor" } reth-revm = { path = "../revm" } reth-rlp = { path = "../rlp" } reth-db = { path = "../storage/db" } @@ -54,6 +53,7 @@ reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] } reth-interfaces = { path = "../interfaces", features = ["test-utils"] } reth-downloaders = { path = "../net/downloaders" } reth-eth-wire = { path = "../net/eth-wire" } # TODO(onbjerg): We only need this for [BlockBody] +reth-executor = { path = "../executor" } tokio = { version = "*", features = ["rt", "sync", "macros"] } tempfile = "3.3.0" assert_matches = "1.5.0" diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 8e5e75ec4..562d61665 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -27,7 +27,8 @@ //! # use reth_interfaces::consensus::Consensus; //! # use reth_interfaces::sync::NoopSyncStateUpdate; //! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater}; -//! # use reth_primitives::PeerId; +//! # use reth_executor::Factory; +//! # use reth_primitives::{PeerId,MAINNET}; //! # use reth_stages::Pipeline; //! # use reth_stages::sets::DefaultStages; //! # let consensus: Arc = Arc::new(TestConsensus::default()); @@ -40,12 +41,13 @@ //! # consensus.clone(), //! # create_test_rw_db() //! # ); +//! # let factory = Factory::new(Arc::new(MAINNET.clone())); //! # let (status_updater, _) = TestStatusUpdater::new(); //! // Create a pipeline that can fully sync //! # let pipeline: Pipeline, NoopSyncStateUpdate> = //! Pipeline::builder() //! .add_stages( -//! DefaultStages::new(consensus, headers_downloader, bodies_downloader, status_updater) +//! DefaultStages::new(consensus, headers_downloader, bodies_downloader, status_updater, factory) //! ) //! .build(); //! ``` @@ -55,11 +57,6 @@ mod pipeline; mod stage; mod util; -/// The real database type we use in Reth using MDBX. -pub type DefaultDB<'a> = LatestStateProviderRef<'a, 'a, Tx<'a, RW, WriteMap>>; -use reth_db::mdbx::{tx::Tx, WriteMap, RW}; -use reth_provider::LatestStateProviderRef; - #[allow(missing_docs)] #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index dd86ef121..54332e2f6 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -14,18 +14,27 @@ //! # use reth_interfaces::sync::NoopSyncStateUpdate; //! # use reth_stages::Pipeline; //! # use reth_stages::sets::{OfflineStages}; +//! # use reth_executor::Factory; +//! # use reth_primitives::MAINNET; +//! # use std::sync::Arc; +//! +//! # let factory = Factory::new(Arc::new(MAINNET.clone())); //! // Build a pipeline with all offline stages. //! # let pipeline: Pipeline, NoopSyncStateUpdate> = -//! Pipeline::builder().add_stages(OfflineStages::default()).build(); +//! Pipeline::builder().add_stages(OfflineStages::new(factory)).build(); //! ``` //! //! ```ignore //! # use reth_stages::Pipeline; //! # use reth_stages::{StageSet, sets::OfflineStages}; +//! # use reth_executor::Factory; +//! # use reth_primitives::MAINNET; +//! # use std::sync::Arc; //! // Build a pipeline with all offline stages and a custom stage at the end. +//! # let factory = Factory::new(Arc::new(MAINNET.clone())); //! Pipeline::builder() //! .add_stages( -//! OfflineStages::default().builder().add_stage(MyCustomStage) +//! OfflineStages::new(factory).builder().add_stage(MyCustomStage) //! ) //! .build(); //! ``` @@ -45,7 +54,7 @@ use reth_interfaces::{ headers::{client::StatusUpdater, downloader::HeaderDownloader}, }, }; -use reth_primitives::ChainSpec; +use reth_provider::ExecutorFactory; use std::sync::Arc; /// A set containing all stages to run a fully syncing instance of reth. @@ -56,39 +65,47 @@ use std::sync::Arc; /// - [`OfflineStages`] /// - [`FinishStage`] #[derive(Debug)] -pub struct DefaultStages { +pub struct DefaultStages { /// Configuration for the online stages online: OnlineStages, + /// Executor factory needs for execution stage + executor_factory: EF, /// Configuration for the [`FinishStage`] stage. status_updater: S, } -impl DefaultStages { +impl DefaultStages { /// Create a new set of default stages with default values. pub fn new( consensus: Arc, header_downloader: H, body_downloader: B, status_updater: S, - ) -> Self { + executor_factory: EF, + ) -> Self + where + EF: ExecutorFactory, + { Self { online: OnlineStages::new(consensus, header_downloader, body_downloader), + executor_factory, status_updater, } } } -impl StageSet for DefaultStages +impl StageSet for DefaultStages where DB: Database, H: HeaderDownloader + 'static, B: BodyDownloader + 'static, S: StatusUpdater + 'static, + EF: ExecutorFactory, { fn builder(self) -> StageSetBuilder { self.online .builder() - .add_set(OfflineStages) + .add_set(OfflineStages::new(self.executor_factory)) .add_stage(FinishStage::new(self.status_updater)) } } @@ -137,40 +154,47 @@ where /// - [`HistoryIndexingStages`] #[derive(Debug, Default)] #[non_exhaustive] -pub struct OfflineStages; +pub struct OfflineStages { + /// Executor factory needs for execution stage + pub executor_factory: EF, +} -impl StageSet for OfflineStages { +impl OfflineStages { + /// Create a new set of ofline stages with default values. + pub fn new(executor_factory: EF) -> Self { + Self { executor_factory } + } +} + +impl StageSet for OfflineStages { fn builder(self) -> StageSetBuilder { - ExecutionStages::default().builder().add_set(HashingStages).add_set(HistoryIndexingStages) + ExecutionStages::new(self.executor_factory) + .builder() + .add_set(HashingStages) + .add_set(HistoryIndexingStages) } } /// A set containing all stages that are required to execute pre-existing block data. #[derive(Debug)] #[non_exhaustive] -pub struct ExecutionStages { - /// The chain specification to use for execution. - chain_spec: ChainSpec, +pub struct ExecutionStages { + /// Executor factory that will create executors. + executor_factory: EF, } -impl Default for ExecutionStages { - fn default() -> Self { - Self { chain_spec: reth_primitives::MAINNET.clone() } - } -} - -impl ExecutionStages { +impl ExecutionStages { /// Create a new set of execution stages with default values. - pub fn new(chain_spec: ChainSpec) -> Self { - Self { chain_spec } + pub fn new(executor_factory: EF) -> Self { + Self { executor_factory } } } -impl StageSet for ExecutionStages { +impl StageSet for ExecutionStages { fn builder(self) -> StageSetBuilder { StageSetBuilder::default() .add_stage(SenderRecoveryStage::default()) - .add_stage(ExecutionStage::>::from(self.chain_spec)) + .add_stage(ExecutionStage::new(self.executor_factory, 10_000)) } } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 633bfec4a..4312a3ca0 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -1,6 +1,6 @@ use crate::{ - exec_or_return, DefaultDB, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, UnwindOutput, + exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + UnwindOutput, }; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, @@ -9,11 +9,9 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_executor::executor::Executor; use reth_interfaces::provider::ProviderError; -use reth_primitives::{Address, Block, ChainSpec, U256}; -use reth_provider::{LatestStateProviderRef, StateProvider, Transaction}; -use reth_revm::database::{State, SubState}; +use reth_primitives::{Address, Block, U256}; +use reth_provider::{BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction}; use tracing::*; /// The [`StageId`] of the execution stage. @@ -49,33 +47,28 @@ pub const EXECUTION: StageId = StageId("Execution"); /// to [tables::PlainStorageState] // false positive, we cannot derive it if !DB: Debug. #[allow(missing_debug_implementations)] -pub struct ExecutionStage<'a, DB = DefaultDB<'a>> -where - DB: StateProvider, -{ +pub struct ExecutionStage { /// The stage's internal executor - pub executor: Executor<'a, DB>, + pub executor_factory: EF, /// Commit threshold pub commit_threshold: u64, } -impl<'a, DB: StateProvider> From> for ExecutionStage<'a, DB> { - fn from(executor: Executor<'a, DB>) -> Self { - Self { executor, commit_threshold: 1_000 } +impl ExecutionStage { + /// Create new execution stage with specified config. + pub fn new(executor_factory: EF, commit_threshold: u64) -> Self { + Self { executor_factory, commit_threshold } } -} -impl<'a, DB: StateProvider> From for ExecutionStage<'a, DB> { - fn from(chain_spec: ChainSpec) -> Self { - let executor = Executor::from(chain_spec); - Self::from(executor) + /// Create execution stage with executor factory and default commit threshold set to 10_000 + /// blocks + pub fn new_default_threshold(executor_factory: EF) -> Self { + Self { executor_factory, commit_threshold: 10_000 } } -} -impl<'a, S: StateProvider> ExecutionStage<'a, S> { /// Execute the stage. pub fn execute_inner( - &mut self, + &self, tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { @@ -116,7 +109,7 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> { // Create state provider with cached state - let mut state_provider = SubState::new(State::new(LatestStateProviderRef::new(&**tx))); + let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx)); // Fetch transactions, execute them and generate results let mut changesets = Vec::with_capacity(block_batch.len()); @@ -154,7 +147,6 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> { trace!(target: "sync::stages::execution", number = block_number, txs = transactions.len(), "Executing block"); // Configure the executor to use the current state. - let mut executor = self.executor.with_db(&mut state_provider); let changeset = executor .execute_and_verify_receipt( &Block { header, body: transactions, ommers, withdrawals }, @@ -166,7 +158,7 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> { } // put execution results to database - tx.insert_execution_result(changesets, &self.executor.chain_spec, last_block)?; + tx.insert_execution_result(changesets, self.executor_factory.chain_spec(), last_block)?; let done = !capped; info!(target: "sync::stages::execution", stage_progress = end_block, done, "Sync iteration finished"); @@ -174,15 +166,8 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> { } } -impl<'a, DB: StateProvider> ExecutionStage<'a, DB> { - /// Create new execution stage with specified config. - pub fn new(executor: Executor<'a, DB>, commit_threshold: u64) -> Self { - Self { executor, commit_threshold } - } -} - #[async_trait::async_trait] -impl Stage for ExecutionStage<'_, State> { +impl Stage for ExecutionStage { /// Return the id of the stage fn id(&self) -> StageId { EXECUTION @@ -306,13 +291,23 @@ mod tests { mdbx::{test_utils::create_test_db, EnvKind, WriteMap}, models::AccountBeforeTx, }; + use reth_executor::Factory; use reth_primitives::{ hex_literal::hex, keccak256, Account, ChainSpecBuilder, SealedBlock, StorageEntry, H160, - H256, MAINNET, U256, + H256, U256, }; use reth_provider::insert_canonical_block; use reth_rlp::Decodable; - use std::ops::{Deref, DerefMut}; + use std::{ + ops::{Deref, DerefMut}, + sync::Arc, + }; + + fn stage() -> ExecutionStage { + let factory = + Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())); + ExecutionStage::new(factory, 100) + } #[tokio::test] async fn sanity_execution_of_block() { @@ -355,8 +350,7 @@ mod tests { db_tx.put::(code_hash, code.to_vec()).unwrap(); tx.commit().unwrap(); - let chain_spec = ChainSpecBuilder::mainnet().berlin_activated().build(); - let mut execution_stage = ExecutionStage::>::from(chain_spec); + let mut execution_stage = stage(); let output = execution_stage.execute(&mut tx, input).await.unwrap(); tx.commit().unwrap(); assert_eq!(output, ExecOutput { stage_progress: 1, done: true }); @@ -440,12 +434,12 @@ mod tests { tx.commit().unwrap(); // execute - let chain_spec = ChainSpecBuilder::mainnet().berlin_activated().build(); - let mut execution_stage = ExecutionStage::>::from(chain_spec); + let mut execution_stage = stage(); let _ = execution_stage.execute(&mut tx, input).await.unwrap(); tx.commit().unwrap(); - let o = ExecutionStage::>::from(MAINNET.clone()) + let mut stage = stage(); + let o = stage .unwind(&mut tx, UnwindInput { stage_progress: 1, unwind_to: 0, bad_block: None }) .await .unwrap(); @@ -526,8 +520,7 @@ mod tests { tx.commit().unwrap(); // execute - let chain_spec = ChainSpecBuilder::mainnet().berlin_activated().build(); - let mut execution_stage = ExecutionStage::>::from(chain_spec); + let mut execution_stage = stage(); let _ = execution_stage.execute(&mut tx, input).await.unwrap(); tx.commit().unwrap(); diff --git a/crates/storage/db/benches/utils.rs b/crates/storage/db/benches/utils.rs index 29561dfe6..8f159ec1d 100644 --- a/crates/storage/db/benches/utils.rs +++ b/crates/storage/db/benches/utils.rs @@ -47,6 +47,7 @@ where /// Sets up a clear database at `bench_db_path`. #[allow(clippy::ptr_arg)] +#[allow(unused)] fn set_up_db( bench_db_path: &Path, pair: &Vec<(::Key, bytes::Bytes, ::Value, bytes::Bytes)>, diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index c9f2496dc..3e08932ec 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -11,8 +11,9 @@ /// Various provider traits. mod traits; pub use traits::{ - AccountProvider, BlockHashProvider, BlockIdProvider, BlockProvider, EvmEnvProvider, - HeaderProvider, StateProvider, StateProviderFactory, TransactionsProvider, WithdrawalsProvider, + AccountProvider, BlockExecutor, BlockHashProvider, BlockIdProvider, BlockProvider, + EvmEnvProvider, ExecutorFactory, HeaderProvider, StateProvider, StateProviderFactory, + TransactionsProvider, WithdrawalsProvider, }; /// Provider trait implementations. diff --git a/crates/storage/provider/src/traits/executor.rs b/crates/storage/provider/src/traits/executor.rs new file mode 100644 index 000000000..98150d506 --- /dev/null +++ b/crates/storage/provider/src/traits/executor.rs @@ -0,0 +1,45 @@ +//! Executor Factory + +use crate::{execution_result::ExecutionResult, StateProvider}; +use reth_interfaces::executor::Error; +use reth_primitives::{Address, Block, ChainSpec, U256}; + +/// Executor factory that would create the EVM with particular state provider. +/// +/// It can be used to mock executor. +pub trait ExecutorFactory: Send + Sync + 'static { + /// The executor produced by the factory + type Executor: BlockExecutor; + + /// Executor with [`StateProvider`] + fn with_sp(&self, sp: SP) -> Self::Executor; + + /// Return internal chainspec + fn chain_spec(&self) -> &ChainSpec; +} + +/// An executor capable of executing a block. +pub trait BlockExecutor { + /// Execute a block. + /// + /// The number of `senders` should be equal to the number of transactions in the block. + /// + /// If no senders are specified, the `execute` function MUST recover the senders for the + /// provided block's transactions internally. We use this to allow for calculating senders in + /// parallel in e.g. staged sync, so that execution can happen without paying for sender + /// recovery costs. + fn execute( + &mut self, + block: &Block, + total_difficulty: U256, + senders: Option>, + ) -> Result; + + /// Executes the block and checks receipts + fn execute_and_verify_receipt( + &mut self, + block: &Block, + total_difficulty: U256, + senders: Option>, + ) -> Result; +} diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 7552ca7f6..12d9adf4a 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -26,3 +26,6 @@ pub use transactions::TransactionsProvider; mod withdrawals; pub use withdrawals::WithdrawalsProvider; + +mod executor; +pub use executor::{BlockExecutor, ExecutorFactory};