From 5d6ac4c815c562677d7ae6ad6b422b55ef4ed8e2 Mon Sep 17 00:00:00 2001 From: Misha Date: Wed, 13 Mar 2024 14:51:30 +0100 Subject: [PATCH] Make ETL file size configurable (#6927) Co-authored-by: Alexey Shekhirin Co-authored-by: joshieDo --- bin/reth/src/commands/debug_cmd/execution.rs | 1 + bin/reth/src/commands/import.rs | 4 ++++ bin/reth/src/commands/stage/run.rs | 8 ++++++- book/cli/reth/node.md | 2 +- book/cli/reth/stage/run.md | 3 +++ book/run/config.md | 13 +++++++++++ crates/config/src/config.rs | 18 ++++++++++++++- .../consensus/beacon/src/engine/test_utils.rs | 1 + crates/node-core/src/node_config.rs | 2 ++ crates/stages/benches/criterion.rs | 2 +- crates/stages/src/lib.rs | 1 + crates/stages/src/sets.rs | 17 ++++++++++++-- crates/stages/src/stages/headers.rs | 6 +++-- crates/stages/src/stages/tx_lookup.rs | 23 ++++++++++++++----- 14 files changed, 87 insertions(+), 14 deletions(-) diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index f0b65da5c..4714056ad 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -127,6 +127,7 @@ impl Command { header_downloader, body_downloader, factory.clone(), + stage_conf.etl.etl_file_size, ) .set(SenderRecoveryStage { commit_threshold: stage_conf.sender_recovery.commit_threshold, diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index 2210f4dba..41264bf98 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -178,6 +178,9 @@ impl ImportCommand { reth_revm::EvmProcessorFactory::new(self.chain.clone(), EthEvmConfig::default()); let max_block = file_client.max_block().unwrap_or(0); + + let etl_file_size = config.stages.etl.etl_file_size; + let mut pipeline = Pipeline::builder() .with_tip_sender(tip_tx) // we want to sync all blocks the file client provides or 0 if empty @@ -190,6 +193,7 @@ impl ImportCommand { header_downloader, body_downloader, factory.clone(), + etl_file_size, ) .set(SenderRecoveryStage { commit_threshold: config.stages.sender_recovery.commit_threshold, diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index 8ea453d29..42747f75c 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -82,6 +82,10 @@ pub struct Command { #[arg(long)] batch_size: Option, + /// The maximum size in bytes of data held in memory before being flushed to disk as a file. + #[arg(long)] + etl_file_size: Option, + /// Normally, running the stage requires unwinding for stages that already /// have been run, in order to not rewrite to the same database slots. /// @@ -151,6 +155,8 @@ impl Command { let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1); + let etl_file_size = self.etl_file_size.unwrap_or(500 * 1024 * 1024); + let (mut exec_stage, mut unwind_stage): (Box>, Option>>) = match self.stage { StageEnum::Bodies => { @@ -229,7 +235,7 @@ impl Command { ) } StageEnum::TxLookup => { - (Box::new(TransactionLookupStage::new(batch_size, None)), None) + (Box::new(TransactionLookupStage::new(batch_size, etl_file_size, None)), None) } StageEnum::AccountHashing => { (Box::new(AccountHashingStage::new(1, batch_size)), None) diff --git a/book/cli/reth/node.md b/book/cli/reth/node.md index 99de6a7b2..961f71592 100644 --- a/book/cli/reth/node.md +++ b/book/cli/reth/node.md @@ -238,7 +238,7 @@ RPC: --rpc-max-tracing-requests Maximum number of concurrent tracing requests - [default: 8] + [default: 10] --rpc-max-blocks-per-filter Maximum number of blocks that could be scanned per filter request. (0 = entire chain) diff --git a/book/cli/reth/stage/run.md b/book/cli/reth/stage/run.md index 246969427..a56964a7a 100644 --- a/book/cli/reth/stage/run.md +++ b/book/cli/reth/stage/run.md @@ -61,6 +61,9 @@ Options: --batch-size Batch size for stage execution and unwind + --etl-file-size + Size for temporary file during ETL stages + -s, --skip-unwind Normally, running the stage requires unwinding for stages that already have been run, in order to not rewrite to the same database slots. diff --git a/book/run/config.md b/book/run/config.md index 9c770dae7..ec559e901 100644 --- a/book/run/config.md +++ b/book/run/config.md @@ -221,6 +221,19 @@ The storage history indexing stage builds an index of what blocks a particular s commit_threshold = 100000 ``` +### `etl` + +An ETL (extract, transform, load) data collector. Used mainly to insert data into `MDBX` in a sorted manner. + +```toml +[stages.etl] +# The maximum size in bytes of data held in memory before being flushed to disk as a file. +# +# Lower threshold corresponds to more frequent flushes, +# but lowers temporary storage usage +file_size = 524_288_000 # 500 * 1024 * 1024 +``` + ## The `[peers]` section The peers section is used to configure how the networking component of reth establishes and maintains connections to peers. diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index cc3741fc5..2ddac702b 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -70,6 +70,8 @@ pub struct StageConfig { pub index_account_history: IndexHistoryConfig, /// Index Storage History stage configuration. pub index_storage_history: IndexHistoryConfig, + /// Common ETL related configuration. + pub etl: EtlConfig, } /// Header stage configuration. @@ -235,7 +237,21 @@ impl Default for TransactionLookupConfig { } } -/// History History stage configuration. +/// Common ETL related configuration. +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] +#[serde(default)] +pub struct EtlConfig { + /// The maximum size in bytes of data held in memory before being flushed to disk as a file. + pub etl_file_size: usize, +} + +impl Default for EtlConfig { + fn default() -> Self { + Self { etl_file_size: 500 * (1024 * 1024) } + } +} + +/// History stage configuration. #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] #[serde(default)] pub struct IndexHistoryConfig { diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index bae48a7ce..6a5df757a 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -406,6 +406,7 @@ where header_downloader, body_downloader, executor_factory.clone(), + 500 * (1024 * 1024), )) } }; diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index 08e59655c..1424c30e9 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -837,6 +837,7 @@ impl NodeConfig { header_downloader, body_downloader, factory.clone(), + stage_config.etl.etl_file_size, ) .set(SenderRecoveryStage { commit_threshold: stage_config.sender_recovery.commit_threshold, @@ -870,6 +871,7 @@ impl NodeConfig { .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) .set(TransactionLookupStage::new( stage_config.transaction_lookup.chunk_size, + stage_config.etl.etl_file_size, prune_modes.transaction_lookup, )) .set(IndexAccountHistoryStage::new( diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index eb668ab74..cdeb8b535 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -57,7 +57,7 @@ fn transaction_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); // don't need to run each stage for that many times group.sample_size(10); - let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None); + let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, 500 * 1024 * 1024, None); let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 568a63bba..785bada6c 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -59,6 +59,7 @@ //! headers_downloader, //! bodies_downloader, //! executor_factory, +//! 500*1024*1024, //! ) //! ) //! .build(provider_factory, static_file_producer); diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index 30fc169ac..b70674096 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -100,6 +100,7 @@ impl DefaultStages { header_downloader: H, body_downloader: B, executor_factory: EF, + etl_file_size: usize, ) -> Self where EF: ExecutorFactory, @@ -111,6 +112,7 @@ impl DefaultStages { consensus, header_downloader, body_downloader, + etl_file_size, ), executor_factory, } @@ -162,6 +164,8 @@ pub struct OnlineStages { header_downloader: H, /// The block body downloader body_downloader: B, + /// The size of temporary files in bytes for ETL data collector. + etl_file_size: usize, } impl OnlineStages { @@ -172,8 +176,9 @@ impl OnlineStages { consensus: Arc, header_downloader: H, body_downloader: B, + etl_file_size: usize, ) -> Self { - Self { provider, header_mode, consensus, header_downloader, body_downloader } + Self { provider, header_mode, consensus, header_downloader, body_downloader, etl_file_size } } } @@ -198,9 +203,16 @@ where mode: HeaderSyncMode, header_downloader: H, consensus: Arc, + etl_file_size: usize, ) -> StageSetBuilder { StageSetBuilder::default() - .add_stage(HeaderStage::new(provider, header_downloader, mode, consensus.clone())) + .add_stage(HeaderStage::new( + provider, + header_downloader, + mode, + consensus.clone(), + etl_file_size, + )) .add_stage(bodies) } } @@ -219,6 +231,7 @@ where self.header_downloader, self.header_mode, self.consensus.clone(), + self.etl_file_size, )) .add_stage(BodyStage::new(self.body_downloader)) } diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index af8c1311e..2dc39306e 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -74,6 +74,7 @@ where downloader: Downloader, mode: HeaderSyncMode, consensus: Arc, + etl_file_size: usize, ) -> Self { Self { provider: database, @@ -81,8 +82,8 @@ where mode, consensus, sync_gap: None, - hash_collector: Collector::new(100 * (1024 * 1024)), - header_collector: Collector::new(100 * (1024 * 1024)), + hash_collector: Collector::new(etl_file_size / 2), + header_collector: Collector::new(etl_file_size / 2), is_etl_ready: false, } } @@ -419,6 +420,7 @@ mod tests { (*self.downloader_factory)(), HeaderSyncMode::Tip(self.channel.1.clone()), self.consensus.clone(), + 500 * (1024 * 1024), ) } } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 64b72431b..fdc673c76 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -32,19 +32,20 @@ pub struct TransactionLookupStage { /// The maximum number of lookup entries to hold in memory before pushing them to /// [`reth_etl::Collector`]. chunk_size: u64, + etl_file_size: usize, prune_mode: Option, } impl Default for TransactionLookupStage { fn default() -> Self { - Self { chunk_size: 5_000_000, prune_mode: None } + Self { chunk_size: 5_000_000, etl_file_size: 500 * 1024 * 1024, prune_mode: None } } } impl TransactionLookupStage { /// Create new instance of [TransactionLookupStage]. - pub fn new(chunk_size: u64, prune_mode: Option) -> Self { - Self { chunk_size, prune_mode } + pub fn new(chunk_size: u64, etl_file_size: usize, prune_mode: Option) -> Self { + Self { chunk_size, etl_file_size, prune_mode } } } @@ -99,7 +100,7 @@ impl Stage for TransactionLookupStage { } // 500MB temporary files - let mut hash_collector: Collector = Collector::new(500 * (1024 * 1024)); + let mut hash_collector: Collector = Collector::new(self.etl_file_size); debug!( target: "sync::stages::transaction_lookup", @@ -397,12 +398,18 @@ mod tests { struct TransactionLookupTestRunner { db: TestStageDB, chunk_size: u64, + etl_file_size: usize, prune_mode: Option, } impl Default for TransactionLookupTestRunner { fn default() -> Self { - Self { db: TestStageDB::default(), chunk_size: 1000, prune_mode: None } + Self { + db: TestStageDB::default(), + chunk_size: 1000, + etl_file_size: 500 * 1024 * 1024, + prune_mode: None, + } } } @@ -449,7 +456,11 @@ mod tests { } fn stage(&self) -> Self::S { - TransactionLookupStage { chunk_size: self.chunk_size, prune_mode: self.prune_mode } + TransactionLookupStage { + chunk_size: self.chunk_size, + etl_file_size: self.etl_file_size, + prune_mode: self.prune_mode, + } } }