refactor: Execution Stage owns Executor (#1568)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
This commit is contained in:
Georgios Konstantopoulos
2023-02-28 16:55:51 -07:00
committed by GitHub
parent 71bc1451af
commit 4285186dbd
12 changed files with 181 additions and 147 deletions

View File

@ -27,6 +27,7 @@ use reth_staged_sync::{
use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
DefaultDB,
};
use std::sync::Arc;
use tracing::{debug, info};
@ -156,9 +157,11 @@ impl ImportCommand {
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: config.stages.execution.commit_threshold,
.set({
let mut stage: ExecutionStage<'_, DefaultDB<'_>> =
ExecutionStage::from(self.chain.clone());
stage.commit_threshold = config.stages.execution.commit_threshold;
stage
}),
)
.with_max_block(0)

View File

@ -7,8 +7,9 @@ use eyre::Result;
use reth_db::{
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
};
use reth_primitives::MAINNET;
use reth_provider::Transaction;
use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput};
use reth_stages::{stages::ExecutionStage, DefaultDB, Stage, StageId, UnwindInput};
use std::ops::DerefMut;
use tracing::info;
@ -96,7 +97,7 @@ async fn unwind_and_copy<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let mut unwind_tx = Transaction::new(db_tool.db)?;
let mut exec_stage = ExecutionStage::default();
let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone());
exec_stage
.unwind(
@ -125,7 +126,7 @@ async fn dry_run(
info!(target: "reth::cli", "Executing stage. [dry-run]");
let mut tx = Transaction::new(&output_db)?;
let mut exec_stage = ExecutionStage::default();
let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone());
exec_stage
.execute(

View File

@ -50,6 +50,7 @@ use reth_staged_sync::{
use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH},
DefaultDB,
};
use reth_tasks::TaskExecutor;
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
@ -448,9 +449,11 @@ impl Command {
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: stage_conf.execution.commit_threshold,
.set({
let mut stage: ExecutionStage<'_, DefaultDB<'_>> =
ExecutionStage::from(self.chain.clone());
stage.commit_threshold = stage_conf.execution.commit_threshold;
stage
}),
)
.build();

View File

@ -17,7 +17,7 @@ use reth_staged_sync::{
};
use reth_stages::{
stages::{BodyStage, ExecutionStage, SenderRecoveryStage},
ExecInput, Stage, StageId, UnwindInput,
DefaultDB, ExecInput, Stage, StageId, UnwindInput,
};
use std::{net::SocketAddr, sync::Arc};
use tracing::*;
@ -171,8 +171,8 @@ impl Command {
stage.execute(&mut tx, input).await?;
}
StageEnum::Execution => {
let mut stage =
ExecutionStage { chain_spec: self.chain.clone(), commit_threshold: num_blocks };
let mut stage = ExecutionStage::<DefaultDB<'_>>::from(self.chain.clone());
stage.commit_threshold = num_blocks;
if !self.skip_unwind {
stage.unwind(&mut tx, unwind).await?;
}

View File

@ -15,7 +15,7 @@ use reth_primitives::{
};
use reth_provider::Transaction;
use reth_rlp::Decodable;
use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId};
use reth_stages::{stages::ExecutionStage, DefaultDB, ExecInput, Stage, StageId};
use std::{
collections::HashMap,
ffi::OsStr,
@ -193,7 +193,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<TestOutcome> {
// Initialize the execution stage
// Hardcode the chain_id to Ethereum 1.
let mut stage = ExecutionStage::new(chain_spec, 1000);
let mut stage = ExecutionStage::<DefaultDB<'_>>::from(chain_spec);
// Call execution stage
let input = ExecInput {