feat: ExecutorFactory (#1672)

This commit is contained in:
rakita
2023-03-08 16:01:56 +01:00
committed by GitHub
parent 57e36223f7
commit 21c66621dd
21 changed files with 276 additions and 196 deletions

1
Cargo.lock generated
View File

@ -4369,6 +4369,7 @@ dependencies = [
"reth-db",
"reth-discv4",
"reth-downloaders",
"reth-executor",
"reth-interfaces",
"reth-net-nat",
"reth-network",

View File

@ -17,6 +17,7 @@ reth-stages = { path = "../../crates/stages"}
reth-interfaces = { path = "../../crates/interfaces", features = ["test-utils"] }
reth-transaction-pool = { path = "../../crates/transaction-pool", features = ["test-utils"] }
reth-consensus = { path = "../../crates/consensus" }
reth-executor = { path = "../../crates/executor" }
reth-rpc-engine-api = { path = "../../crates/rpc/rpc-engine-api" }
reth-rpc-builder = { path = "../../crates/rpc/rpc-builder" }
reth-rpc = { path = "../../crates/rpc/rpc" }

View File

@ -27,7 +27,6 @@ use reth_staged_sync::{
use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
DefaultDB,
};
use std::sync::Arc;
use tracing::{debug, info};
@ -141,6 +140,8 @@ impl ImportCommand {
.build(file_client.clone(), consensus.clone(), db)
.into_task();
let factory = reth_executor::Factory::new(Arc::new(self.chain.clone()));
let mut pipeline = Pipeline::builder()
.with_sync_state_updater(file_client)
.add_stages(
@ -149,6 +150,7 @@ impl ImportCommand {
header_downloader,
body_downloader,
NoopStatusUpdater::default(),
factory.clone(),
)
.set(TotalDifficultyStage {
chain_spec: self.chain.clone(),
@ -157,12 +159,7 @@ impl ImportCommand {
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.set({
let mut stage: ExecutionStage<'_, DefaultDB<'_>> =
ExecutionStage::from(self.chain.clone());
stage.commit_threshold = config.stages.execution.commit_threshold;
stage
}),
.set(ExecutionStage::new(factory, config.stages.execution.commit_threshold)),
)
.with_max_block(0)
.build();

View File

@ -9,8 +9,8 @@ use reth_db::{
};
use reth_primitives::MAINNET;
use reth_provider::Transaction;
use reth_stages::{stages::ExecutionStage, DefaultDB, Stage, StageId, UnwindInput};
use std::ops::DerefMut;
use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput};
use std::{ops::DerefMut, sync::Arc};
use tracing::info;
pub(crate) async fn dump_execution_stage<DB: Database>(
@ -97,7 +97,10 @@ async fn unwind_and_copy<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let mut unwind_tx = Transaction::new(db_tool.db)?;
let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone());
let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new(
Arc::new(MAINNET.clone()),
));
exec_stage
.unwind(
@ -126,7 +129,9 @@ async fn dry_run(
info!(target: "reth::cli", "Executing stage. [dry-run]");
let mut tx = Transaction::new(&output_db)?;
let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone());
let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new(
Arc::new(MAINNET.clone()),
));
exec_stage
.execute(

View File

@ -9,9 +9,9 @@ use reth_primitives::MAINNET;
use reth_provider::Transaction;
use reth_stages::{
stages::{AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage},
DefaultDB, Stage, StageId, UnwindInput,
Stage, StageId, UnwindInput,
};
use std::ops::DerefMut;
use std::{ops::DerefMut, sync::Arc};
use tracing::info;
pub(crate) async fn dump_merkle_stage<DB: Database>(
@ -75,7 +75,10 @@ async fn unwind_and_copy<DB: Database>(
MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?;
// Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone());
let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new(
Arc::new(MAINNET.clone()),
));
exec_stage.commit_threshold = u64::MAX;
exec_stage
.unwind(

View File

@ -50,7 +50,6 @@ use reth_staged_sync::{
use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH},
DefaultDB,
};
use reth_tasks::TaskExecutor;
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
@ -444,23 +443,25 @@ impl Command {
builder = builder.with_max_block(max_block)
}
let factory = reth_executor::Factory::new(Arc::new(self.chain.clone()));
let pipeline = builder
.with_sync_state_updater(updater.clone())
.add_stages(
DefaultStages::new(consensus.clone(), header_downloader, body_downloader, updater)
.set(TotalDifficultyStage {
chain_spec: self.chain.clone(),
commit_threshold: stage_conf.total_difficulty.commit_threshold,
})
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set({
let mut stage: ExecutionStage<'_, DefaultDB<'_>> =
ExecutionStage::from(self.chain.clone());
stage.commit_threshold = stage_conf.execution.commit_threshold;
stage
}),
DefaultStages::new(
consensus.clone(),
header_downloader,
body_downloader,
updater,
factory.clone(),
)
.set(TotalDifficultyStage {
chain_spec: self.chain.clone(),
commit_threshold: stage_conf.total_difficulty.commit_threshold,
})
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)),
)
.build();

View File

@ -17,7 +17,7 @@ use reth_staged_sync::{
};
use reth_stages::{
stages::{BodyStage, ExecutionStage, SenderRecoveryStage},
DefaultDB, ExecInput, Stage, StageId, UnwindInput,
ExecInput, Stage, StageId, UnwindInput,
};
use std::{net::SocketAddr, sync::Arc};
use tracing::*;
@ -171,7 +171,8 @@ impl Command {
stage.execute(&mut tx, input).await?;
}
StageEnum::Execution => {
let mut stage = ExecutionStage::<DefaultDB<'_>>::from(self.chain.clone());
let factory = reth_executor::Factory::new(Arc::new(self.chain.clone()));
let mut stage = ExecutionStage::new(factory, 10_000);
stage.commit_threshold = num_blocks;
if !self.skip_unwind {
stage.unwind(&mut tx, unwind).await?;

View File

@ -15,11 +15,12 @@ use reth_primitives::{
};
use reth_provider::Transaction;
use reth_rlp::Decodable;
use reth_stages::{stages::ExecutionStage, DefaultDB, ExecInput, Stage, StageId};
use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId};
use std::{
collections::HashMap,
ffi::OsStr,
path::{Path, PathBuf},
sync::Arc,
};
use tracing::{debug, trace};
@ -193,7 +194,8 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<TestOutcome> {
// Initialize the execution stage
// Hardcode the chain_id to Ethereum 1.
let mut stage = ExecutionStage::<DefaultDB<'_>>::from(chain_spec);
let factory = reth_executor::Factory::new(Arc::new(chain_spec));
let mut stage = ExecutionStage::new(factory, 1_000);
// Call execution stage
let input = ExecInput {

View File

@ -1,14 +1,12 @@
use reth_interfaces::executor::{BlockExecutor, Error};
use crate::execution_result::{
AccountChangeSet, AccountInfoChangeSet, ExecutionResult, TransactionChangeSet,
};
use reth_interfaces::executor::Error;
use reth_primitives::{
bloom::logs_bloom, Account, Address, Block, Bloom, ChainSpec, Hardfork, Header, Log, Receipt,
TransactionSigned, H256, U256,
};
use reth_provider::{
execution_result::{
AccountChangeSet, AccountInfoChangeSet, ExecutionResult, TransactionChangeSet,
},
StateProvider,
};
use reth_provider::{BlockExecutor, StateProvider};
use reth_revm::{
config::{WEI_2ETH, WEI_3ETH, WEI_5ETH},
database::SubState,
@ -30,38 +28,34 @@ use std::{
};
/// Main block executor
pub struct Executor<'a, DB>
pub struct Executor<DB>
where
DB: StateProvider,
{
/// The configured chain-spec
pub chain_spec: Arc<ChainSpec>,
evm: EVM<&'a mut SubState<DB>>,
evm: EVM<SubState<DB>>,
stack: InspectorStack,
}
impl<'a, DB> From<ChainSpec> for Executor<'a, DB>
impl<DB> From<Arc<ChainSpec>> for Executor<DB>
where
DB: StateProvider,
{
/// Instantiates a new executor from the chainspec. Must call
/// `with_db` to set the database before executing.
fn from(chain_spec: ChainSpec) -> Self {
fn from(chain_spec: Arc<ChainSpec>) -> Self {
let evm = EVM::new();
Executor {
chain_spec: Arc::new(chain_spec),
evm,
stack: InspectorStack::new(InspectorStackConfig::default()),
}
Executor { chain_spec, evm, stack: InspectorStack::new(InspectorStackConfig::default()) }
}
}
impl<'a, DB> Executor<'a, DB>
impl<DB> Executor<DB>
where
DB: StateProvider,
{
/// Creates a new executor from the given chain spec and database.
pub fn new(chain_spec: Arc<ChainSpec>, db: &'a mut SubState<DB>) -> Self {
pub fn new(chain_spec: Arc<ChainSpec>, db: SubState<DB>) -> Self {
let mut evm = EVM::new();
evm.database(db);
@ -79,16 +73,6 @@ where
self.evm.db().expect("db to not be moved")
}
/// Overrides the database
pub fn with_db<OtherDB: StateProvider>(
&self,
db: &'a mut SubState<OtherDB>,
) -> Executor<'a, OtherDB> {
let mut evm = EVM::new();
evm.database(db);
Executor { chain_spec: self.chain_spec.clone(), evm, stack: self.stack.clone() }
}
fn recover_senders(
&self,
body: &[TransactionSigned],
@ -376,30 +360,6 @@ where
}
}
/// Execute and verify block
pub fn execute_and_verify_receipt(
&mut self,
block: &Block,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<ExecutionResult, Error> {
let execution_result = self.execute(block, total_difficulty, senders)?;
let receipts_iter =
execution_result.tx_changesets.iter().map(|changeset| &changeset.receipt);
if self.chain_spec.fork(Hardfork::Byzantium).active_at_block(block.header.number) {
verify_receipt(block.header.receipts_root, block.header.logs_bloom, receipts_iter)?;
}
// TODO Before Byzantium, receipts contained state root that would mean that expensive
// operation as hashing that is needed for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
Ok(execution_result)
}
/// Runs a single transaction in the configured environment and proceeds
/// to return the result and state diff (without applying it).
///
@ -487,7 +447,7 @@ where
}
}
impl<'a, DB> BlockExecutor<ExecutionResult> for Executor<'a, DB>
impl<DB> BlockExecutor<DB> for Executor<DB>
where
DB: StateProvider,
{
@ -521,6 +481,29 @@ where
Ok(ExecutionResult { tx_changesets, block_changesets })
}
fn execute_and_verify_receipt(
&mut self,
block: &Block,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<ExecutionResult, Error> {
let execution_result = self.execute(block, total_difficulty, senders)?;
let receipts_iter =
execution_result.tx_changesets.iter().map(|changeset| &changeset.receipt);
if self.chain_spec.fork(Hardfork::Byzantium).active_at_block(block.header.number) {
verify_receipt(block.header.receipts_root, block.header.logs_bloom, receipts_iter)?;
}
// TODO Before Byzantium, receipts contained state root that would mean that expensive
// operation as hashing that is needed for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
Ok(execution_result)
}
}
/// Verify receipts
@ -661,10 +644,10 @@ mod tests {
// spec at berlin fork
let chain_spec = Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build());
let mut db = SubState::new(State::new(db));
let db = SubState::new(State::new(db));
// execute chain and verify receipts
let mut executor = Executor::new(chain_spec, &mut db);
let mut executor = Executor::new(chain_spec, db);
let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap();
assert_eq!(out.tx_changesets.len(), 1, "Should executed one transaction");
@ -689,6 +672,7 @@ mod tests {
// Check if cache is set
// account1
let db = executor.db();
let cached_acc1 = db.accounts.get(&account1).unwrap();
assert_eq!(cached_acc1.info.balance, account1_info.balance);
assert_eq!(cached_acc1.info.nonce, account1_info.nonce);
@ -791,9 +775,9 @@ mod tests {
.build(),
);
let mut db = SubState::new(State::new(db));
let db = SubState::new(State::new(db));
// execute chain and verify receipts
let mut executor = Executor::new(chain_spec, &mut db);
let mut executor = Executor::new(chain_spec, db);
let out = executor
.execute_and_verify_receipt(
&Block { header, body: vec![], ommers: vec![], withdrawals: None },
@ -805,6 +789,7 @@ mod tests {
// Check if cache is set
// beneficiary
let db = executor.db();
let dao_beneficiary =
db.accounts.get(&crate::eth_dao_fork::DAO_HARDFORK_BENEFICIARY).unwrap();
@ -881,10 +866,10 @@ mod tests {
// spec at berlin fork
let chain_spec = Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build());
let mut db = SubState::new(State::new(db));
let db = SubState::new(State::new(db));
// execute chain and verify receipts
let mut executor = Executor::new(chain_spec, &mut db);
let mut executor = Executor::new(chain_spec, db);
let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap();
assert_eq!(out.tx_changesets.len(), 1, "Should executed one transaction");
@ -930,10 +915,10 @@ mod tests {
// spec at shanghai fork
let chain_spec = Arc::new(ChainSpecBuilder::mainnet().shanghai_activated().build());
let mut db = SubState::new(State::new(StateProviderTest::default()));
let db = SubState::new(State::new(StateProviderTest::default()));
// execute chain and verify receipts
let mut executor = Executor::new(chain_spec, &mut db);
let mut executor = Executor::new(chain_spec, db);
let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap();
assert_eq!(out.tx_changesets.len(), 0, "No tx");

View File

@ -0,0 +1,34 @@
use reth_primitives::ChainSpec;
use reth_provider::{ExecutorFactory, StateProvider};
use reth_revm::database::{State, SubState};
use crate::executor::Executor;
use std::sync::Arc;
/// Factory that spawn Executor.
#[derive(Clone, Debug)]
pub struct Factory {
chain_spec: Arc<ChainSpec>,
}
impl Factory {
/// Create new factory
pub fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self { chain_spec }
}
}
impl ExecutorFactory for Factory {
type Executor<SP: StateProvider> = Executor<SP>;
/// Executor with [`StateProvider`]
fn with_sp<SP: StateProvider>(&self, sp: SP) -> Self::Executor<SP> {
let substate = SubState::new(State::new(sp));
Executor::new(self.chain_spec.clone(), substate)
}
/// Return internal chainspec
fn chain_spec(&self) -> &ChainSpec {
self.chain_spec.as_ref()
}
}

View File

@ -13,3 +13,7 @@ pub mod eth_dao_fork;
pub use reth_provider::execution_result;
/// Executor
pub mod executor;
/// ExecutorFactory impl
pub mod factory;
pub use factory::Factory;

View File

@ -1,26 +1,6 @@
use async_trait::async_trait;
use reth_primitives::{Address, Block, Bloom, H256, U256};
use reth_primitives::{Bloom, H256};
use thiserror::Error;
/// An executor capable of executing a block.
#[async_trait]
pub trait BlockExecutor<T> {
/// Execute a block.
///
/// The number of `senders` should be equal to the number of transactions in the block.
///
/// If no senders are specified, the `execute` function MUST recover the senders for the
/// provided block's transactions internally. We use this to allow for calculating senders in
/// parallel in e.g. staged sync, so that execution can happen without paying for sender
/// recovery costs.
fn execute(
&mut self,
block: &Block,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<T, Error>;
}
/// BlockExecutor Errors
#[allow(missing_docs)]
#[derive(Error, Debug, Clone, PartialEq, Eq)]

View File

@ -6,8 +6,10 @@ use reth_primitives::{
BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, TransactionSigned,
H64, U256,
};
use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory};
use reth_revm::database::{State, SubState};
use reth_provider::{
BlockExecutor, BlockProvider, EvmEnvProvider, ExecutorFactory, HeaderProvider,
StateProviderFactory,
};
use reth_rlp::Decodable;
use reth_rpc_types::engine::{
ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
@ -305,8 +307,8 @@ impl<Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvi
let state_provider = self.client.latest()?;
let total_difficulty = parent_td + block.header.difficulty;
let mut db = SubState::new(State::new(&state_provider));
let mut executor = reth_executor::executor::Executor::new(self.chain_spec.clone(), &mut db);
let factory = reth_executor::Factory::new(self.chain_spec.clone());
let mut executor = factory.with_sp(&state_provider);
match executor.execute_and_verify_receipt(&block.unseal(), total_difficulty, None) {
Ok(_) => Ok(PayloadStatus::new(PayloadStatusEnum::Valid, block_hash)),
Err(err) => Ok(PayloadStatus::new(

View File

@ -17,7 +17,6 @@ normal = [
# reth libs
reth-primitives = { path = "../primitives" }
reth-interfaces = { path = "../interfaces" }
reth-executor = { path = "../executor" }
reth-revm = { path = "../revm" }
reth-rlp = { path = "../rlp" }
reth-db = { path = "../storage/db" }
@ -54,6 +53,7 @@ reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] }
reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
reth-downloaders = { path = "../net/downloaders" }
reth-eth-wire = { path = "../net/eth-wire" } # TODO(onbjerg): We only need this for [BlockBody]
reth-executor = { path = "../executor" }
tokio = { version = "*", features = ["rt", "sync", "macros"] }
tempfile = "3.3.0"
assert_matches = "1.5.0"

View File

@ -27,7 +27,8 @@
//! # use reth_interfaces::consensus::Consensus;
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater};
//! # use reth_primitives::PeerId;
//! # use reth_executor::Factory;
//! # use reth_primitives::{PeerId,MAINNET};
//! # use reth_stages::Pipeline;
//! # use reth_stages::sets::DefaultStages;
//! # let consensus: Arc<dyn Consensus> = Arc::new(TestConsensus::default());
@ -40,12 +41,13 @@
//! # consensus.clone(),
//! # create_test_rw_db()
//! # );
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
//! # let (status_updater, _) = TestStatusUpdater::new();
//! // Create a pipeline that can fully sync
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
//! Pipeline::builder()
//! .add_stages(
//! DefaultStages::new(consensus, headers_downloader, bodies_downloader, status_updater)
//! DefaultStages::new(consensus, headers_downloader, bodies_downloader, status_updater, factory)
//! )
//! .build();
//! ```
@ -55,11 +57,6 @@ mod pipeline;
mod stage;
mod util;
/// The real database type we use in Reth using MDBX.
pub type DefaultDB<'a> = LatestStateProviderRef<'a, 'a, Tx<'a, RW, WriteMap>>;
use reth_db::mdbx::{tx::Tx, WriteMap, RW};
use reth_provider::LatestStateProviderRef;
#[allow(missing_docs)]
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

View File

@ -14,18 +14,27 @@
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
//! # use reth_stages::Pipeline;
//! # use reth_stages::sets::{OfflineStages};
//! # use reth_executor::Factory;
//! # use reth_primitives::MAINNET;
//! # use std::sync::Arc;
//!
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
//! // Build a pipeline with all offline stages.
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
//! Pipeline::builder().add_stages(OfflineStages::default()).build();
//! Pipeline::builder().add_stages(OfflineStages::new(factory)).build();
//! ```
//!
//! ```ignore
//! # use reth_stages::Pipeline;
//! # use reth_stages::{StageSet, sets::OfflineStages};
//! # use reth_executor::Factory;
//! # use reth_primitives::MAINNET;
//! # use std::sync::Arc;
//! // Build a pipeline with all offline stages and a custom stage at the end.
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
//! Pipeline::builder()
//! .add_stages(
//! OfflineStages::default().builder().add_stage(MyCustomStage)
//! OfflineStages::new(factory).builder().add_stage(MyCustomStage)
//! )
//! .build();
//! ```
@ -45,7 +54,7 @@ use reth_interfaces::{
headers::{client::StatusUpdater, downloader::HeaderDownloader},
},
};
use reth_primitives::ChainSpec;
use reth_provider::ExecutorFactory;
use std::sync::Arc;
/// A set containing all stages to run a fully syncing instance of reth.
@ -56,39 +65,47 @@ use std::sync::Arc;
/// - [`OfflineStages`]
/// - [`FinishStage`]
#[derive(Debug)]
pub struct DefaultStages<H, B, S> {
pub struct DefaultStages<H, B, S, EF> {
/// Configuration for the online stages
online: OnlineStages<H, B>,
/// Executor factory needs for execution stage
executor_factory: EF,
/// Configuration for the [`FinishStage`] stage.
status_updater: S,
}
impl<H, B, S> DefaultStages<H, B, S> {
impl<H, B, S, EF> DefaultStages<H, B, S, EF> {
/// Create a new set of default stages with default values.
pub fn new(
consensus: Arc<dyn Consensus>,
header_downloader: H,
body_downloader: B,
status_updater: S,
) -> Self {
executor_factory: EF,
) -> Self
where
EF: ExecutorFactory,
{
Self {
online: OnlineStages::new(consensus, header_downloader, body_downloader),
executor_factory,
status_updater,
}
}
}
impl<DB, H, B, S> StageSet<DB> for DefaultStages<H, B, S>
impl<DB, H, B, S, EF> StageSet<DB> for DefaultStages<H, B, S, EF>
where
DB: Database,
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
S: StatusUpdater + 'static,
EF: ExecutorFactory,
{
fn builder(self) -> StageSetBuilder<DB> {
self.online
.builder()
.add_set(OfflineStages)
.add_set(OfflineStages::new(self.executor_factory))
.add_stage(FinishStage::new(self.status_updater))
}
}
@ -137,40 +154,47 @@ where
/// - [`HistoryIndexingStages`]
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct OfflineStages;
pub struct OfflineStages<EF: ExecutorFactory> {
/// Executor factory needs for execution stage
pub executor_factory: EF,
}
impl<DB: Database> StageSet<DB> for OfflineStages {
impl<EF: ExecutorFactory> OfflineStages<EF> {
/// Create a new set of ofline stages with default values.
pub fn new(executor_factory: EF) -> Self {
Self { executor_factory }
}
}
impl<EF: ExecutorFactory, DB: Database> StageSet<DB> for OfflineStages<EF> {
fn builder(self) -> StageSetBuilder<DB> {
ExecutionStages::default().builder().add_set(HashingStages).add_set(HistoryIndexingStages)
ExecutionStages::new(self.executor_factory)
.builder()
.add_set(HashingStages)
.add_set(HistoryIndexingStages)
}
}
/// A set containing all stages that are required to execute pre-existing block data.
#[derive(Debug)]
#[non_exhaustive]
pub struct ExecutionStages {
/// The chain specification to use for execution.
chain_spec: ChainSpec,
pub struct ExecutionStages<EF: ExecutorFactory> {
/// Executor factory that will create executors.
executor_factory: EF,
}
impl Default for ExecutionStages {
fn default() -> Self {
Self { chain_spec: reth_primitives::MAINNET.clone() }
}
}
impl ExecutionStages {
impl<EF: ExecutorFactory + 'static> ExecutionStages<EF> {
/// Create a new set of execution stages with default values.
pub fn new(chain_spec: ChainSpec) -> Self {
Self { chain_spec }
pub fn new(executor_factory: EF) -> Self {
Self { executor_factory }
}
}
impl<DB: Database> StageSet<DB> for ExecutionStages {
impl<EF: ExecutorFactory, DB: Database> StageSet<DB> for ExecutionStages<EF> {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(SenderRecoveryStage::default())
.add_stage(ExecutionStage::<crate::DefaultDB<'_>>::from(self.chain_spec))
.add_stage(ExecutionStage::new(self.executor_factory, 10_000))
}
}

View File

@ -1,6 +1,6 @@
use crate::{
exec_or_return, DefaultDB, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
@ -9,11 +9,9 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_executor::executor::Executor;
use reth_interfaces::provider::ProviderError;
use reth_primitives::{Address, Block, ChainSpec, U256};
use reth_provider::{LatestStateProviderRef, StateProvider, Transaction};
use reth_revm::database::{State, SubState};
use reth_primitives::{Address, Block, U256};
use reth_provider::{BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction};
use tracing::*;
/// The [`StageId`] of the execution stage.
@ -49,33 +47,28 @@ pub const EXECUTION: StageId = StageId("Execution");
/// to [tables::PlainStorageState]
// false positive, we cannot derive it if !DB: Debug.
#[allow(missing_debug_implementations)]
pub struct ExecutionStage<'a, DB = DefaultDB<'a>>
where
DB: StateProvider,
{
pub struct ExecutionStage<EF: ExecutorFactory> {
/// The stage's internal executor
pub executor: Executor<'a, DB>,
pub executor_factory: EF,
/// Commit threshold
pub commit_threshold: u64,
}
impl<'a, DB: StateProvider> From<Executor<'a, DB>> for ExecutionStage<'a, DB> {
fn from(executor: Executor<'a, DB>) -> Self {
Self { executor, commit_threshold: 1_000 }
impl<EF: ExecutorFactory> ExecutionStage<EF> {
/// Create new execution stage with specified config.
pub fn new(executor_factory: EF, commit_threshold: u64) -> Self {
Self { executor_factory, commit_threshold }
}
}
impl<'a, DB: StateProvider> From<ChainSpec> for ExecutionStage<'a, DB> {
fn from(chain_spec: ChainSpec) -> Self {
let executor = Executor::from(chain_spec);
Self::from(executor)
/// Create execution stage with executor factory and default commit threshold set to 10_000
/// blocks
pub fn new_default_threshold(executor_factory: EF) -> Self {
Self { executor_factory, commit_threshold: 10_000 }
}
}
impl<'a, S: StateProvider> ExecutionStage<'a, S> {
/// Execute the stage.
pub fn execute_inner<DB: Database>(
&mut self,
&self,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
@ -116,7 +109,7 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> {
// Create state provider with cached state
let mut state_provider = SubState::new(State::new(LatestStateProviderRef::new(&**tx)));
let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx));
// Fetch transactions, execute them and generate results
let mut changesets = Vec::with_capacity(block_batch.len());
@ -154,7 +147,6 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> {
trace!(target: "sync::stages::execution", number = block_number, txs = transactions.len(), "Executing block");
// Configure the executor to use the current state.
let mut executor = self.executor.with_db(&mut state_provider);
let changeset = executor
.execute_and_verify_receipt(
&Block { header, body: transactions, ommers, withdrawals },
@ -166,7 +158,7 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> {
}
// put execution results to database
tx.insert_execution_result(changesets, &self.executor.chain_spec, last_block)?;
tx.insert_execution_result(changesets, self.executor_factory.chain_spec(), last_block)?;
let done = !capped;
info!(target: "sync::stages::execution", stage_progress = end_block, done, "Sync iteration finished");
@ -174,15 +166,8 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> {
}
}
impl<'a, DB: StateProvider> ExecutionStage<'a, DB> {
/// Create new execution stage with specified config.
pub fn new(executor: Executor<'a, DB>, commit_threshold: u64) -> Self {
Self { executor, commit_threshold }
}
}
#[async_trait::async_trait]
impl<State: StateProvider, DB: Database> Stage<DB> for ExecutionStage<'_, State> {
impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
/// Return the id of the stage
fn id(&self) -> StageId {
EXECUTION
@ -306,13 +291,23 @@ mod tests {
mdbx::{test_utils::create_test_db, EnvKind, WriteMap},
models::AccountBeforeTx,
};
use reth_executor::Factory;
use reth_primitives::{
hex_literal::hex, keccak256, Account, ChainSpecBuilder, SealedBlock, StorageEntry, H160,
H256, MAINNET, U256,
H256, U256,
};
use reth_provider::insert_canonical_block;
use reth_rlp::Decodable;
use std::ops::{Deref, DerefMut};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
};
fn stage() -> ExecutionStage<Factory> {
let factory =
Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build()));
ExecutionStage::new(factory, 100)
}
#[tokio::test]
async fn sanity_execution_of_block() {
@ -355,8 +350,7 @@ mod tests {
db_tx.put::<tables::Bytecodes>(code_hash, code.to_vec()).unwrap();
tx.commit().unwrap();
let chain_spec = ChainSpecBuilder::mainnet().berlin_activated().build();
let mut execution_stage = ExecutionStage::<DefaultDB<'_>>::from(chain_spec);
let mut execution_stage = stage();
let output = execution_stage.execute(&mut tx, input).await.unwrap();
tx.commit().unwrap();
assert_eq!(output, ExecOutput { stage_progress: 1, done: true });
@ -440,12 +434,12 @@ mod tests {
tx.commit().unwrap();
// execute
let chain_spec = ChainSpecBuilder::mainnet().berlin_activated().build();
let mut execution_stage = ExecutionStage::<DefaultDB<'_>>::from(chain_spec);
let mut execution_stage = stage();
let _ = execution_stage.execute(&mut tx, input).await.unwrap();
tx.commit().unwrap();
let o = ExecutionStage::<DefaultDB<'_>>::from(MAINNET.clone())
let mut stage = stage();
let o = stage
.unwind(&mut tx, UnwindInput { stage_progress: 1, unwind_to: 0, bad_block: None })
.await
.unwrap();
@ -526,8 +520,7 @@ mod tests {
tx.commit().unwrap();
// execute
let chain_spec = ChainSpecBuilder::mainnet().berlin_activated().build();
let mut execution_stage = ExecutionStage::<DefaultDB<'_>>::from(chain_spec);
let mut execution_stage = stage();
let _ = execution_stage.execute(&mut tx, input).await.unwrap();
tx.commit().unwrap();

View File

@ -47,6 +47,7 @@ where
/// Sets up a clear database at `bench_db_path`.
#[allow(clippy::ptr_arg)]
#[allow(unused)]
fn set_up_db<T>(
bench_db_path: &Path,
pair: &Vec<(<T as Table>::Key, bytes::Bytes, <T as Table>::Value, bytes::Bytes)>,

View File

@ -11,8 +11,9 @@
/// Various provider traits.
mod traits;
pub use traits::{
AccountProvider, BlockHashProvider, BlockIdProvider, BlockProvider, EvmEnvProvider,
HeaderProvider, StateProvider, StateProviderFactory, TransactionsProvider, WithdrawalsProvider,
AccountProvider, BlockExecutor, BlockHashProvider, BlockIdProvider, BlockProvider,
EvmEnvProvider, ExecutorFactory, HeaderProvider, StateProvider, StateProviderFactory,
TransactionsProvider, WithdrawalsProvider,
};
/// Provider trait implementations.

View File

@ -0,0 +1,45 @@
//! Executor Factory
use crate::{execution_result::ExecutionResult, StateProvider};
use reth_interfaces::executor::Error;
use reth_primitives::{Address, Block, ChainSpec, U256};
/// Executor factory that would create the EVM with particular state provider.
///
/// It can be used to mock executor.
pub trait ExecutorFactory: Send + Sync + 'static {
/// The executor produced by the factory
type Executor<T: StateProvider>: BlockExecutor<T>;
/// Executor with [`StateProvider`]
fn with_sp<SP: StateProvider>(&self, sp: SP) -> Self::Executor<SP>;
/// Return internal chainspec
fn chain_spec(&self) -> &ChainSpec;
}
/// An executor capable of executing a block.
pub trait BlockExecutor<SP: StateProvider> {
/// Execute a block.
///
/// The number of `senders` should be equal to the number of transactions in the block.
///
/// If no senders are specified, the `execute` function MUST recover the senders for the
/// provided block's transactions internally. We use this to allow for calculating senders in
/// parallel in e.g. staged sync, so that execution can happen without paying for sender
/// recovery costs.
fn execute(
&mut self,
block: &Block,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<ExecutionResult, Error>;
/// Executes the block and checks receipts
fn execute_and_verify_receipt(
&mut self,
block: &Block,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<ExecutionResult, Error>;
}

View File

@ -26,3 +26,6 @@ pub use transactions::TransactionsProvider;
mod withdrawals;
pub use withdrawals::WithdrawalsProvider;
mod executor;
pub use executor::{BlockExecutor, ExecutorFactory};