chore: refactor DefaultStages to take StageConfig (#8173)

This commit is contained in:
joshieDo
2024-05-12 12:38:34 +01:00
committed by GitHub
parent 487f7e302b
commit d9f9504dbd
17 changed files with 245 additions and 223 deletions

View File

@ -35,7 +35,7 @@ use reth_provider::{
}; };
use reth_stages::{ use reth_stages::{
sets::DefaultStages, sets::DefaultStages,
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage}, stages::{ExecutionStage, ExecutionStageThresholds},
Pipeline, StageSet, Pipeline, StageSet,
}; };
use reth_static_file::StaticFileProducer; use reth_static_file::StaticFileProducer;
@ -109,6 +109,7 @@ impl Command {
.into_task_with(task_executor); .into_task_with(task_executor);
let stage_conf = &config.stages; let stage_conf = &config.stages;
let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
let (tip_tx, tip_rx) = watch::channel(B256::ZERO); let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = block_executor!(self.chain.clone()); let executor = block_executor!(self.chain.clone());
@ -124,11 +125,9 @@ impl Command {
header_downloader, header_downloader,
body_downloader, body_downloader,
executor.clone(), executor.clone(),
stage_conf.etl.clone(), stage_conf.clone(),
prune_modes.clone(),
) )
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new( .set(ExecutionStage::new(
executor, executor,
ExecutionStageThresholds { ExecutionStageThresholds {
@ -137,12 +136,8 @@ impl Command {
max_cumulative_gas: None, max_cumulative_gas: None,
max_duration: None, max_duration: None,
}, },
stage_conf stage_conf.execution_external_clean_threshold(),
.merkle prune_modes,
.clean_threshold
.max(stage_conf.account_hashing.clean_threshold)
.max(stage_conf.storage_hashing.clean_threshold),
config.prune.clone().map(|prune| prune.segments).unwrap_or_default(),
ExExManagerHandle::empty(), ExExManagerHandle::empty(),
)), )),
) )

View File

@ -21,7 +21,6 @@ use reth_downloaders::{
file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}, file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
headers::reverse_headers::ReverseHeadersDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder,
}; };
use reth_exex::ExExManagerHandle;
use reth_interfaces::p2p::{ use reth_interfaces::p2p::{
bodies::downloader::BodyDownloader, bodies::downloader::BodyDownloader,
headers::downloader::{HeaderDownloader, SyncTarget}, headers::downloader::{HeaderDownloader, SyncTarget},
@ -33,11 +32,7 @@ use reth_provider::{
BlockNumReader, ChainSpecProvider, HeaderProvider, HeaderSyncMode, ProviderError, BlockNumReader, ChainSpecProvider, HeaderProvider, HeaderSyncMode, ProviderError,
ProviderFactory, StageCheckpointReader, StaticFileProviderFactory, ProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
}; };
use reth_stages::{ use reth_stages::{prelude::*, Pipeline, StageSet};
prelude::*,
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage},
Pipeline, StageSet,
};
use reth_static_file::StaticFileProducer; use reth_static_file::StaticFileProducer;
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch; use tokio::sync::watch;
@ -273,29 +268,11 @@ where
consensus.clone(), consensus.clone(),
header_downloader, header_downloader,
body_downloader, body_downloader,
executor.clone(),
config.stages.etl.clone(),
)
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(
executor, executor,
ExecutionStageThresholds { config.stages.clone(),
max_blocks: config.stages.execution.max_blocks, PruneModes::default(),
max_changes: config.stages.execution.max_changes, )
max_cumulative_gas: config.stages.execution.max_cumulative_gas, .builder()
max_duration: config.stages.execution.max_duration,
},
config
.stages
.merkle
.clean_threshold
.max(config.stages.account_hashing.clean_threshold)
.max(config.stages.storage_hashing.clean_threshold),
config.prune.as_ref().map(|prune| prune.segments.clone()).unwrap_or_default(),
ExExManagerHandle::empty(),
))
.disable_all_if(&StageId::STATE_REQUIRED, || should_exec), .disable_all_if(&StageId::STATE_REQUIRED, || should_exec),
) )
.build(provider_factory, static_file_producer); .build(provider_factory, static_file_producer);

View File

@ -16,7 +16,10 @@ use crate::{
use clap::Parser; use clap::Parser;
use reth_beacon_consensus::EthBeaconConsensus; use reth_beacon_consensus::EthBeaconConsensus;
use reth_cli_runner::CliContext; use reth_cli_runner::CliContext;
use reth_config::{config::EtlConfig, Config}; use reth_config::{
config::{EtlConfig, HashingConfig, SenderRecoveryConfig, TransactionLookupConfig},
Config,
};
use reth_db::init_db; use reth_db::init_db;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_exex::ExExManagerHandle; use reth_exex::ExExManagerHandle;
@ -165,6 +168,7 @@ impl Command {
Some(self.etl_dir.unwrap_or_else(|| EtlConfig::from_datadir(data_dir.data_dir()))), Some(self.etl_dir.unwrap_or_else(|| EtlConfig::from_datadir(data_dir.data_dir()))),
self.etl_file_size.unwrap_or(EtlConfig::default_file_size()), self.etl_file_size.unwrap_or(EtlConfig::default_file_size()),
); );
let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) = let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
match self.stage { match self.stage {
@ -222,7 +226,12 @@ impl Command {
); );
(Box::new(stage), None) (Box::new(stage), None)
} }
StageEnum::Senders => (Box::new(SenderRecoveryStage::new(batch_size)), None), StageEnum::Senders => (
Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
commit_threshold: batch_size,
})),
None,
),
StageEnum::Execution => { StageEnum::Execution => {
let executor = block_executor!(self.chain.clone()); let executor = block_executor!(self.chain.clone());
( (
@ -235,31 +244,52 @@ impl Command {
max_duration: None, max_duration: None,
}, },
config.stages.merkle.clean_threshold, config.stages.merkle.clean_threshold,
config.prune.map(|prune| prune.segments).unwrap_or_default(), prune_modes,
ExExManagerHandle::empty(), ExExManagerHandle::empty(),
)), )),
None, None,
) )
} }
StageEnum::TxLookup => { StageEnum::TxLookup => (
(Box::new(TransactionLookupStage::new(batch_size, etl_config, None)), None) Box::new(TransactionLookupStage::new(
} TransactionLookupConfig { chunk_size: batch_size },
StageEnum::AccountHashing => { etl_config,
(Box::new(AccountHashingStage::new(1, batch_size, etl_config)), None) prune_modes.transaction_lookup,
} )),
StageEnum::StorageHashing => { None,
(Box::new(StorageHashingStage::new(1, batch_size, etl_config)), None) ),
} StageEnum::AccountHashing => (
Box::new(AccountHashingStage::new(
HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
etl_config,
)),
None,
),
StageEnum::StorageHashing => (
Box::new(StorageHashingStage::new(
HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
etl_config,
)),
None,
),
StageEnum::Merkle => ( StageEnum::Merkle => (
Box::new(MerkleStage::default_execution()), Box::new(MerkleStage::new_execution(config.stages.merkle.clean_threshold)),
Some(Box::new(MerkleStage::default_unwind())), Some(Box::new(MerkleStage::default_unwind())),
), ),
StageEnum::AccountHistory => ( StageEnum::AccountHistory => (
Box::new(IndexAccountHistoryStage::default().with_etl_config(etl_config)), Box::new(IndexAccountHistoryStage::new(
config.stages.index_account_history,
etl_config,
prune_modes.account_history,
)),
None, None,
), ),
StageEnum::StorageHistory => ( StageEnum::StorageHistory => (
Box::new(IndexStorageHistoryStage::default().with_etl_config(etl_config)), Box::new(IndexStorageHistoryStage::new(
config.stages.index_storage_history,
etl_config,
prune_modes.storage_history,
)),
None, None,
), ),
_ => return Ok(()), _ => return Ok(()),

View File

@ -15,11 +15,7 @@ use reth_provider::{
}; };
use reth_stages::{ use reth_stages::{
sets::DefaultStages, sets::DefaultStages,
stages::{ stages::{ExecutionStage, ExecutionStageThresholds},
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
TransactionLookupStage,
},
Pipeline, StageSet, Pipeline, StageSet,
}; };
use reth_static_file::StaticFileProducer; use reth_static_file::StaticFileProducer;
@ -133,6 +129,7 @@ impl Command {
let consensus: Arc<dyn Consensus> = let consensus: Arc<dyn Consensus> =
Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec())); Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
let stage_conf = &config.stages; let stage_conf = &config.stages;
let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
let (tip_tx, tip_rx) = watch::channel(B256::ZERO); let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = block_executor!(provider_factory.chain_spec()); let executor = block_executor!(provider_factory.chain_spec());
@ -148,11 +145,9 @@ impl Command {
NoopHeaderDownloader::default(), NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(), NoopBodiesDownloader::default(),
executor.clone(), executor.clone(),
stage_conf.etl.clone(), stage_conf.clone(),
prune_modes.clone(),
) )
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new( .set(ExecutionStage::new(
executor, executor,
ExecutionStageThresholds { ExecutionStageThresholds {
@ -161,20 +156,10 @@ impl Command {
max_cumulative_gas: None, max_cumulative_gas: None,
max_duration: None, max_duration: None,
}, },
stage_conf stage_conf.execution_external_clean_threshold(),
.merkle prune_modes,
.clean_threshold
.max(stage_conf.account_hashing.clean_threshold)
.max(stage_conf.storage_hashing.clean_threshold),
config.prune.clone().map(|prune| prune.segments).unwrap_or_default(),
ExExManagerHandle::empty(), ExExManagerHandle::empty(),
)) )),
.set(AccountHashingStage::default())
.set(StorageHashingStage::default())
.set(MerkleStage::default_unwind())
.set(TransactionLookupStage::default())
.set(IndexAccountHistoryStage::default())
.set(IndexStorageHistoryStage::default()),
) )
.build( .build(
provider_factory.clone(), provider_factory.clone(),

View File

@ -96,6 +96,19 @@ pub struct StageConfig {
pub etl: EtlConfig, pub etl: EtlConfig,
} }
impl StageConfig {
/// The highest threshold (in number of blocks) for switching between incremental and full
/// calculations across `MerkleStage`, `AccountHashingStage` and `StorageHashingStage`. This is
/// required to figure out if can prune or not changesets on subsequent pipeline runs during
/// `ExecutionStage`
pub fn execution_external_clean_threshold(&self) -> u64 {
self.merkle
.clean_threshold
.max(self.account_hashing.clean_threshold)
.max(self.storage_hashing.clean_threshold)
}
}
/// Header stage configuration. /// Header stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)] #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default)] #[serde(default)]

View File

@ -6,7 +6,7 @@ use crate::{
use reth_blockchain_tree::{ use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
}; };
use reth_config::config::EtlConfig; use reth_config::config::StageConfig;
use reth_consensus::{test_utils::TestConsensus, Consensus}; use reth_consensus::{test_utils::TestConsensus, Consensus};
use reth_db::{test_utils::TempDatabase, DatabaseEnv as DE}; use reth_db::{test_utils::TempDatabase, DatabaseEnv as DE};
use reth_downloaders::{ use reth_downloaders::{
@ -375,7 +375,8 @@ where
header_downloader, header_downloader,
body_downloader, body_downloader,
executor_factory.clone(), executor_factory.clone(),
EtlConfig::default(), StageConfig::default(),
PruneModes::default(),
)) ))
} }
}; };

View File

@ -18,15 +18,7 @@ use reth_node_core::{
primitives::{BlockNumber, B256}, primitives::{BlockNumber, B256},
}; };
use reth_provider::{HeaderSyncMode, ProviderFactory}; use reth_provider::{HeaderSyncMode, ProviderFactory};
use reth_stages::{ use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet};
prelude::DefaultStages,
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
TransactionLookupStage,
},
Pipeline, StageSet,
};
use reth_static_file::StaticFileProducer; use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor; use reth_tasks::TaskExecutor;
use reth_tracing::tracing::debug; use reth_tracing::tracing::debug;
@ -131,56 +123,19 @@ where
header_downloader, header_downloader,
body_downloader, body_downloader,
executor.clone(), executor.clone(),
stage_config.etl.clone(), stage_config.clone(),
prune_modes.clone(),
) )
.set(SenderRecoveryStage {
commit_threshold: stage_config.sender_recovery.commit_threshold,
})
.set( .set(
ExecutionStage::new( ExecutionStage::new(
executor, executor,
ExecutionStageThresholds { stage_config.execution.into(),
max_blocks: stage_config.execution.max_blocks, stage_config.execution_external_clean_threshold(),
max_changes: stage_config.execution.max_changes, prune_modes,
max_cumulative_gas: stage_config.execution.max_cumulative_gas,
max_duration: stage_config.execution.max_duration,
},
stage_config
.merkle
.clean_threshold
.max(stage_config.account_hashing.clean_threshold)
.max(stage_config.storage_hashing.clean_threshold),
prune_modes.clone(),
exex_manager_handle, exex_manager_handle,
) )
.with_metrics_tx(metrics_tx), .with_metrics_tx(metrics_tx),
) ),
.set(AccountHashingStage::new(
stage_config.account_hashing.clean_threshold,
stage_config.account_hashing.commit_threshold,
stage_config.etl.clone(),
))
.set(StorageHashingStage::new(
stage_config.storage_hashing.clean_threshold,
stage_config.storage_hashing.commit_threshold,
stage_config.etl.clone(),
))
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(
stage_config.transaction_lookup.chunk_size,
stage_config.etl.clone(),
prune_modes.transaction_lookup,
))
.set(IndexAccountHistoryStage::new(
stage_config.index_account_history.commit_threshold,
prune_modes.account_history,
stage_config.etl.clone(),
))
.set(IndexStorageHistoryStage::new(
stage_config.index_storage_history.commit_threshold,
prune_modes.storage_history,
stage_config.etl.clone(),
)),
) )
.build(provider_factory, static_file_producer); .build(provider_factory, static_file_producer);

View File

@ -2,7 +2,7 @@
use criterion::{criterion_main, measurement::WallTime, BenchmarkGroup, Criterion}; use criterion::{criterion_main, measurement::WallTime, BenchmarkGroup, Criterion};
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
use pprof::criterion::{Output, PProfProfiler}; use pprof::criterion::{Output, PProfProfiler};
use reth_config::config::EtlConfig; use reth_config::config::{EtlConfig, TransactionLookupConfig};
use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_db::{test_utils::TempDatabase, DatabaseEnv};
use reth_primitives::{stage::StageCheckpoint, BlockNumber}; use reth_primitives::{stage::StageCheckpoint, BlockNumber};
@ -87,7 +87,11 @@ fn transaction_lookup(c: &mut Criterion, runtime: &Runtime) {
let mut group = c.benchmark_group("Stages"); let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times // don't need to run each stage for that many times
group.sample_size(10); group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, EtlConfig::default(), None); let stage = TransactionLookupStage::new(
TransactionLookupConfig { chunk_size: DEFAULT_NUM_BLOCKS },
EtlConfig::default(),
None,
);
let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

View File

@ -28,7 +28,7 @@
//! # use reth_provider::HeaderSyncMode; //! # use reth_provider::HeaderSyncMode;
//! # use reth_provider::test_utils::create_test_provider_factory; //! # use reth_provider::test_utils::create_test_provider_factory;
//! # use reth_static_file::StaticFileProducer; //! # use reth_static_file::StaticFileProducer;
//! # use reth_config::config::EtlConfig; //! # use reth_config::config::StageConfig;
//! # use reth_consensus::Consensus; //! # use reth_consensus::Consensus;
//! # use reth_consensus::test_utils::TestConsensus; //! # use reth_consensus::test_utils::TestConsensus;
//! # //! #
@ -62,7 +62,8 @@
//! headers_downloader, //! headers_downloader,
//! bodies_downloader, //! bodies_downloader,
//! executor_provider, //! executor_provider,
//! EtlConfig::default(), //! StageConfig::default(),
//! PruneModes::default(),
//! )) //! ))
//! .build(provider_factory, static_file_producer); //! .build(provider_factory, static_file_producer);
//! ``` //! ```

View File

@ -17,7 +17,7 @@
//! # use reth_provider::StaticFileProviderFactory; //! # use reth_provider::StaticFileProviderFactory;
//! # use reth_provider::test_utils::create_test_provider_factory; //! # use reth_provider::test_utils::create_test_provider_factory;
//! # use reth_static_file::StaticFileProducer; //! # use reth_static_file::StaticFileProducer;
//! # use reth_config::config::EtlConfig; //! # use reth_config::config::StageConfig;
//! # use reth_evm::execute::BlockExecutorProvider; //! # use reth_evm::execute::BlockExecutorProvider;
//! //!
//! # fn create(exec: impl BlockExecutorProvider) { //! # fn create(exec: impl BlockExecutorProvider) {
@ -30,7 +30,7 @@
//! ); //! );
//! // Build a pipeline with all offline stages. //! // Build a pipeline with all offline stages.
//! let pipeline = Pipeline::builder() //! let pipeline = Pipeline::builder()
//! .add_stages(OfflineStages::new(exec, EtlConfig::default())) //! .add_stages(OfflineStages::new(exec, StageConfig::default(), PruneModes::default()))
//! .build(provider_factory, static_file_producer); //! .build(provider_factory, static_file_producer);
//! //!
//! # } //! # }
@ -43,13 +43,14 @@ use crate::{
}, },
StageSet, StageSetBuilder, StageSet, StageSetBuilder,
}; };
use reth_config::config::EtlConfig; use reth_config::config::StageConfig;
use reth_consensus::Consensus; use reth_consensus::Consensus;
use reth_db::database::Database; use reth_db::database::Database;
use reth_evm::execute::BlockExecutorProvider; use reth_evm::execute::BlockExecutorProvider;
use reth_interfaces::p2p::{ use reth_interfaces::p2p::{
bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader,
}; };
use reth_primitives::PruneModes;
use reth_provider::{HeaderSyncGapProvider, HeaderSyncMode}; use reth_provider::{HeaderSyncGapProvider, HeaderSyncMode};
use std::sync::Arc; use std::sync::Arc;
@ -80,12 +81,15 @@ pub struct DefaultStages<Provider, H, B, EF> {
online: OnlineStages<Provider, H, B>, online: OnlineStages<Provider, H, B>,
/// Executor factory needs for execution stage /// Executor factory needs for execution stage
executor_factory: EF, executor_factory: EF,
/// ETL configuration /// Configuration for each stage in the pipeline
etl_config: EtlConfig, stages_config: StageConfig,
/// Prune configuration for every segment that can be pruned
prune_modes: PruneModes,
} }
impl<Provider, H, B, E> DefaultStages<Provider, H, B, E> { impl<Provider, H, B, E> DefaultStages<Provider, H, B, E> {
/// Create a new set of default stages with default values. /// Create a new set of default stages with default values.
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
provider: Provider, provider: Provider,
header_mode: HeaderSyncMode, header_mode: HeaderSyncMode,
@ -93,7 +97,8 @@ impl<Provider, H, B, E> DefaultStages<Provider, H, B, E> {
header_downloader: H, header_downloader: H,
body_downloader: B, body_downloader: B,
executor_factory: E, executor_factory: E,
etl_config: EtlConfig, stages_config: StageConfig,
prune_modes: PruneModes,
) -> Self ) -> Self
where where
E: BlockExecutorProvider, E: BlockExecutorProvider,
@ -105,10 +110,11 @@ impl<Provider, H, B, E> DefaultStages<Provider, H, B, E> {
consensus, consensus,
header_downloader, header_downloader,
body_downloader, body_downloader,
etl_config.clone(), stages_config.clone(),
), ),
executor_factory, executor_factory,
etl_config, stages_config,
prune_modes,
} }
} }
} }
@ -121,11 +127,12 @@ where
pub fn add_offline_stages<DB: Database>( pub fn add_offline_stages<DB: Database>(
default_offline: StageSetBuilder<DB>, default_offline: StageSetBuilder<DB>,
executor_factory: E, executor_factory: E,
etl_config: EtlConfig, stages_config: StageConfig,
prune_modes: PruneModes,
) -> StageSetBuilder<DB> { ) -> StageSetBuilder<DB> {
StageSetBuilder::default() StageSetBuilder::default()
.add_set(default_offline) .add_set(default_offline)
.add_set(OfflineStages::new(executor_factory, etl_config)) .add_set(OfflineStages::new(executor_factory, stages_config, prune_modes))
.add_stage(FinishStage) .add_stage(FinishStage)
} }
} }
@ -139,7 +146,12 @@ where
DB: Database + 'static, DB: Database + 'static,
{ {
fn builder(self) -> StageSetBuilder<DB> { fn builder(self) -> StageSetBuilder<DB> {
Self::add_offline_stages(self.online.builder(), self.executor_factory, self.etl_config) Self::add_offline_stages(
self.online.builder(),
self.executor_factory,
self.stages_config.clone(),
self.prune_modes,
)
} }
} }
@ -159,8 +171,8 @@ pub struct OnlineStages<Provider, H, B> {
header_downloader: H, header_downloader: H,
/// The block body downloader /// The block body downloader
body_downloader: B, body_downloader: B,
/// ETL configuration /// Configuration for each stage in the pipeline
etl_config: EtlConfig, stages_config: StageConfig,
} }
impl<Provider, H, B> OnlineStages<Provider, H, B> { impl<Provider, H, B> OnlineStages<Provider, H, B> {
@ -171,9 +183,9 @@ impl<Provider, H, B> OnlineStages<Provider, H, B> {
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,
header_downloader: H, header_downloader: H,
body_downloader: B, body_downloader: B,
etl_config: EtlConfig, stages_config: StageConfig,
) -> Self { ) -> Self {
Self { provider, header_mode, consensus, header_downloader, body_downloader, etl_config } Self { provider, header_mode, consensus, header_downloader, body_downloader, stages_config }
} }
} }
@ -198,7 +210,7 @@ where
mode: HeaderSyncMode, mode: HeaderSyncMode,
header_downloader: H, header_downloader: H,
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,
etl_config: EtlConfig, stages_config: StageConfig,
) -> StageSetBuilder<DB> { ) -> StageSetBuilder<DB> {
StageSetBuilder::default() StageSetBuilder::default()
.add_stage(HeaderStage::new( .add_stage(HeaderStage::new(
@ -206,7 +218,7 @@ where
header_downloader, header_downloader,
mode, mode,
consensus.clone(), consensus.clone(),
etl_config, stages_config.etl,
)) ))
.add_stage(bodies) .add_stage(bodies)
} }
@ -226,7 +238,7 @@ where
self.header_downloader, self.header_downloader,
self.header_mode, self.header_mode,
self.consensus.clone(), self.consensus.clone(),
self.etl_config.clone(), self.stages_config.etl.clone(),
)) ))
.add_stage(BodyStage::new(self.body_downloader)) .add_stage(BodyStage::new(self.body_downloader))
} }
@ -244,14 +256,16 @@ where
pub struct OfflineStages<EF> { pub struct OfflineStages<EF> {
/// Executor factory needs for execution stage /// Executor factory needs for execution stage
pub executor_factory: EF, pub executor_factory: EF,
/// ETL configuration /// Configuration for each stage in the pipeline
etl_config: EtlConfig, stages_config: StageConfig,
/// Prune configuration for every segment that can be pruned
prune_modes: PruneModes,
} }
impl<EF> OfflineStages<EF> { impl<EF> OfflineStages<EF> {
/// Create a new set of offline stages with default values. /// Create a new set of offline stages with default values.
pub fn new(executor_factory: EF, etl_config: EtlConfig) -> Self { pub fn new(executor_factory: EF, stages_config: StageConfig, prune_modes: PruneModes) -> Self {
Self { executor_factory, etl_config } Self { executor_factory, stages_config, prune_modes }
} }
} }
@ -261,10 +275,17 @@ where
DB: Database, DB: Database,
{ {
fn builder(self) -> StageSetBuilder<DB> { fn builder(self) -> StageSetBuilder<DB> {
ExecutionStages::new(self.executor_factory) ExecutionStages::new(
.builder() self.executor_factory,
.add_set(HashingStages { etl_config: self.etl_config.clone() }) self.stages_config.clone(),
.add_set(HistoryIndexingStages { etl_config: self.etl_config }) self.prune_modes.clone(),
)
.builder()
.add_set(HashingStages { stages_config: self.stages_config.clone() })
.add_set(HistoryIndexingStages {
stages_config: self.stages_config.clone(),
prune_modes: self.prune_modes,
})
} }
} }
@ -274,12 +295,16 @@ where
pub struct ExecutionStages<E> { pub struct ExecutionStages<E> {
/// Executor factory that will create executors. /// Executor factory that will create executors.
executor_factory: E, executor_factory: E,
/// Configuration for each stage in the pipeline
stages_config: StageConfig,
/// Prune configuration for every segment that can be pruned
prune_modes: PruneModes,
} }
impl<E> ExecutionStages<E> { impl<E> ExecutionStages<E> {
/// Create a new set of execution stages with default values. /// Create a new set of execution stages with default values.
pub fn new(executor_factory: E) -> Self { pub fn new(executor_factory: E, stages_config: StageConfig, prune_modes: PruneModes) -> Self {
Self { executor_factory } Self { executor_factory, stages_config, prune_modes }
} }
} }
@ -290,8 +315,13 @@ where
{ {
fn builder(self) -> StageSetBuilder<DB> { fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default() StageSetBuilder::default()
.add_stage(SenderRecoveryStage::default()) .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
.add_stage(ExecutionStage::new_with_executor(self.executor_factory)) .add_stage(ExecutionStage::from_config(
self.executor_factory,
self.stages_config.execution,
self.stages_config.execution_external_clean_threshold(),
self.prune_modes,
))
} }
} }
@ -299,17 +329,23 @@ where
#[derive(Debug, Default)] #[derive(Debug, Default)]
#[non_exhaustive] #[non_exhaustive]
pub struct HashingStages { pub struct HashingStages {
/// ETL configuration /// Configuration for each stage in the pipeline
etl_config: EtlConfig, stages_config: StageConfig,
} }
impl<DB: Database> StageSet<DB> for HashingStages { impl<DB: Database> StageSet<DB> for HashingStages {
fn builder(self) -> StageSetBuilder<DB> { fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default() StageSetBuilder::default()
.add_stage(MerkleStage::default_unwind()) .add_stage(MerkleStage::default_unwind())
.add_stage(AccountHashingStage::default().with_etl_config(self.etl_config.clone())) .add_stage(AccountHashingStage::new(
.add_stage(StorageHashingStage::default().with_etl_config(self.etl_config)) self.stages_config.account_hashing,
.add_stage(MerkleStage::default_execution()) self.stages_config.etl.clone(),
))
.add_stage(StorageHashingStage::new(
self.stages_config.storage_hashing,
self.stages_config.etl.clone(),
))
.add_stage(MerkleStage::new_execution(self.stages_config.merkle.clean_threshold))
} }
} }
@ -317,15 +353,29 @@ impl<DB: Database> StageSet<DB> for HashingStages {
#[derive(Debug, Default)] #[derive(Debug, Default)]
#[non_exhaustive] #[non_exhaustive]
pub struct HistoryIndexingStages { pub struct HistoryIndexingStages {
/// ETL configuration /// Configuration for each stage in the pipeline
etl_config: EtlConfig, stages_config: StageConfig,
/// Prune configuration for every segment that can be pruned
prune_modes: PruneModes,
} }
impl<DB: Database> StageSet<DB> for HistoryIndexingStages { impl<DB: Database> StageSet<DB> for HistoryIndexingStages {
fn builder(self) -> StageSetBuilder<DB> { fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default() StageSetBuilder::default()
.add_stage(TransactionLookupStage::default().with_etl_config(self.etl_config.clone())) .add_stage(TransactionLookupStage::new(
.add_stage(IndexStorageHistoryStage::default().with_etl_config(self.etl_config.clone())) self.stages_config.transaction_lookup,
.add_stage(IndexAccountHistoryStage::default().with_etl_config(self.etl_config)) self.stages_config.etl.clone(),
self.prune_modes.transaction_lookup,
))
.add_stage(IndexStorageHistoryStage::new(
self.stages_config.index_storage_history,
self.stages_config.etl.clone(),
self.prune_modes.account_history,
))
.add_stage(IndexAccountHistoryStage::new(
self.stages_config.index_account_history,
self.stages_config.etl.clone(),
self.prune_modes.storage_history,
))
} }
} }

View File

@ -1,5 +1,6 @@
use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD; use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD;
use num_traits::Zero; use num_traits::Zero;
use reth_config::config::ExecutionConfig;
use reth_db::{ use reth_db::{
cursor::DbCursorRO, database::Database, static_file::HeaderMask, tables, transaction::DbTx, cursor::DbCursorRO, database::Database, static_file::HeaderMask, tables, transaction::DbTx,
}; };
@ -111,6 +112,22 @@ impl<E> ExecutionStage<E> {
) )
} }
/// Create new instance of [ExecutionStage] from configuration.
pub fn from_config(
executor_provider: E,
config: ExecutionConfig,
external_clean_threshold: u64,
prune_modes: PruneModes,
) -> Self {
Self::new(
executor_provider,
config.into(),
external_clean_threshold,
prune_modes,
ExExManagerHandle::empty(),
)
}
/// Set the metric events sender. /// Set the metric events sender.
pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
self.metrics_tx = Some(metrics_tx); self.metrics_tx = Some(metrics_tx);
@ -540,6 +557,17 @@ impl ExecutionStageThresholds {
} }
} }
impl From<ExecutionConfig> for ExecutionStageThresholds {
fn from(config: ExecutionConfig) -> Self {
ExecutionStageThresholds {
max_blocks: config.max_blocks,
max_changes: config.max_changes,
max_cumulative_gas: config.max_cumulative_gas,
max_duration: config.max_duration,
}
}
}
/// Returns a `StaticFileProviderRWRefMut` static file producer after performing a consistency /// Returns a `StaticFileProviderRWRefMut` static file producer after performing a consistency
/// check. /// check.
/// ///

View File

@ -1,5 +1,5 @@
use itertools::Itertools; use itertools::Itertools;
use reth_config::config::EtlConfig; use reth_config::config::{EtlConfig, HashingConfig};
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbCursorRW}, cursor::{DbCursorRO, DbCursorRW},
database::Database, database::Database,
@ -44,14 +44,12 @@ pub struct AccountHashingStage {
impl AccountHashingStage { impl AccountHashingStage {
/// Create new instance of [AccountHashingStage]. /// Create new instance of [AccountHashingStage].
pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self { pub fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
Self { clean_threshold, commit_threshold, etl_config } Self {
} clean_threshold: config.clean_threshold,
commit_threshold: config.commit_threshold,
/// Set the ETL configuration to use. etl_config,
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { }
self.etl_config = etl_config;
self
} }
} }

View File

@ -1,5 +1,5 @@
use itertools::Itertools; use itertools::Itertools;
use reth_config::config::EtlConfig; use reth_config::config::{EtlConfig, HashingConfig};
use reth_db::{ use reth_db::{
codecs::CompactU256, codecs::CompactU256,
cursor::{DbCursorRO, DbDupCursorRW}, cursor::{DbCursorRO, DbDupCursorRW},
@ -45,14 +45,12 @@ pub struct StorageHashingStage {
impl StorageHashingStage { impl StorageHashingStage {
/// Create new instance of [StorageHashingStage]. /// Create new instance of [StorageHashingStage].
pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self { pub fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
Self { clean_threshold, commit_threshold, etl_config } Self {
} clean_threshold: config.clean_threshold,
commit_threshold: config.commit_threshold,
/// Set the ETL configuration to use. etl_config,
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { }
self.etl_config = etl_config;
self
} }
} }

View File

@ -1,5 +1,5 @@
use super::{collect_history_indices, load_history_indices}; use super::{collect_history_indices, load_history_indices};
use reth_config::config::EtlConfig; use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db::{ use reth_db::{
database::Database, models::ShardedKey, table::Decode, tables, transaction::DbTxMut, database::Database, models::ShardedKey, table::Decode, tables, transaction::DbTxMut,
}; };
@ -31,17 +31,11 @@ pub struct IndexAccountHistoryStage {
impl IndexAccountHistoryStage { impl IndexAccountHistoryStage {
/// Create new instance of [IndexAccountHistoryStage]. /// Create new instance of [IndexAccountHistoryStage].
pub fn new( pub fn new(
commit_threshold: u64, config: IndexHistoryConfig,
prune_mode: Option<PruneMode>,
etl_config: EtlConfig, etl_config: EtlConfig,
prune_mode: Option<PruneMode>,
) -> Self { ) -> Self {
Self { commit_threshold, prune_mode, etl_config } Self { commit_threshold: config.commit_threshold, etl_config, prune_mode }
}
/// Set the ETL configuration to use.
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self {
self.etl_config = etl_config;
self
} }
} }

View File

@ -1,5 +1,5 @@
use super::{collect_history_indices, load_history_indices}; use super::{collect_history_indices, load_history_indices};
use reth_config::config::EtlConfig; use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db::{ use reth_db::{
database::Database, database::Database,
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress}, models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
@ -35,17 +35,11 @@ pub struct IndexStorageHistoryStage {
impl IndexStorageHistoryStage { impl IndexStorageHistoryStage {
/// Create new instance of [IndexStorageHistoryStage]. /// Create new instance of [IndexStorageHistoryStage].
pub fn new( pub fn new(
commit_threshold: u64, config: IndexHistoryConfig,
prune_mode: Option<PruneMode>,
etl_config: EtlConfig, etl_config: EtlConfig,
prune_mode: Option<PruneMode>,
) -> Self { ) -> Self {
Self { commit_threshold, prune_mode, etl_config } Self { commit_threshold: config.commit_threshold, prune_mode, etl_config }
}
/// Set the ETL configuration to use.
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self {
self.etl_config = etl_config;
self
} }
} }

View File

@ -1,3 +1,4 @@
use reth_config::config::SenderRecoveryConfig;
use reth_consensus::ConsensusError; use reth_consensus::ConsensusError;
use reth_db::{ use reth_db::{
cursor::DbCursorRW, cursor::DbCursorRW,
@ -42,8 +43,8 @@ pub struct SenderRecoveryStage {
impl SenderRecoveryStage { impl SenderRecoveryStage {
/// Create new instance of [SenderRecoveryStage]. /// Create new instance of [SenderRecoveryStage].
pub fn new(commit_threshold: u64) -> Self { pub fn new(config: SenderRecoveryConfig) -> Self {
Self { commit_threshold } Self { commit_threshold: config.commit_threshold }
} }
} }

View File

@ -1,5 +1,5 @@
use num_traits::Zero; use num_traits::Zero;
use reth_config::config::EtlConfig; use reth_config::config::{EtlConfig, TransactionLookupConfig};
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbCursorRW}, cursor::{DbCursorRO, DbCursorRW},
database::Database, database::Database,
@ -45,14 +45,12 @@ impl Default for TransactionLookupStage {
impl TransactionLookupStage { impl TransactionLookupStage {
/// Create new instance of [TransactionLookupStage]. /// Create new instance of [TransactionLookupStage].
pub fn new(chunk_size: u64, etl_config: EtlConfig, prune_mode: Option<PruneMode>) -> Self { pub fn new(
Self { chunk_size, etl_config, prune_mode } config: TransactionLookupConfig,
} etl_config: EtlConfig,
prune_mode: Option<PruneMode>,
/// Set the ETL configuration to use. ) -> Self {
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { Self { chunk_size: config.chunk_size, etl_config, prune_mode }
self.etl_config = etl_config;
self
} }
} }