From 6bb94af5bbf928f9a706fef1c23fb5dcdcfcba65 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 1 Sep 2023 15:06:49 +0100 Subject: [PATCH] feat(stages): respect `PruneModes` in `TxLookup` stage (#4390) --- Cargo.lock | 8 +- bin/reth/src/node/mod.rs | 5 +- bin/reth/src/stage/run.rs | 6 +- crates/config/src/config.rs | 2 +- .../consensus/beacon/src/engine/test_utils.rs | 2 +- crates/prune/src/pruner.rs | 2 +- crates/stages/benches/criterion.rs | 4 +- crates/stages/src/stages/execution.rs | 2 +- crates/stages/src/stages/tx_lookup.rs | 131 +++++++++++++++--- 9 files changed, 133 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c8386b30..f2df57294 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6188,7 +6188,7 @@ dependencies = [ [[package]] name = "revm" version = "3.3.0" -source = "git+https://github.com/bluealloy/revm/?branch=release/v25#6084e0fa2d457931cd8c9d29934bca0812b5b8d6" +source = "git+https://github.com/bluealloy/revm/?branch=release/v25#88337924f4d16ed1f5e4cde12a03d0cb755cd658" dependencies = [ "auto_impl", "revm-interpreter", @@ -6198,7 +6198,7 @@ dependencies = [ [[package]] name = "revm-interpreter" version = "1.1.2" -source = "git+https://github.com/bluealloy/revm/?branch=release/v25#6084e0fa2d457931cd8c9d29934bca0812b5b8d6" +source = "git+https://github.com/bluealloy/revm/?branch=release/v25#88337924f4d16ed1f5e4cde12a03d0cb755cd658" dependencies = [ "derive_more", "enumn", @@ -6209,7 +6209,7 @@ dependencies = [ [[package]] name = "revm-precompile" version = "2.0.3" -source = "git+https://github.com/bluealloy/revm/?branch=release/v25#6084e0fa2d457931cd8c9d29934bca0812b5b8d6" +source = "git+https://github.com/bluealloy/revm/?branch=release/v25#88337924f4d16ed1f5e4cde12a03d0cb755cd658" dependencies = [ "k256", "num", @@ -6225,7 +6225,7 @@ dependencies = [ [[package]] name = "revm-primitives" version = "1.1.2" -source = "git+https://github.com/bluealloy/revm/?branch=release/v25#6084e0fa2d457931cd8c9d29934bca0812b5b8d6" +source = "git+https://github.com/bluealloy/revm/?branch=release/v25#88337924f4d16ed1f5e4cde12a03d0cb755cd658" dependencies = [ "arbitrary", "auto_impl", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 58d7bccb8..cd42ee81f 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -865,7 +865,10 @@ impl NodeCommand { stage_config.storage_hashing.commit_threshold, )) .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) - .set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold)) + .set(TransactionLookupStage::new( + stage_config.transaction_lookup.commit_threshold, + prune_modes.clone(), + )) .set(IndexAccountHistoryStage::new( stage_config.index_account_history.commit_threshold, prune_modes.clone(), diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 0ec5d7d4f..079db6783 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -12,7 +12,7 @@ use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; use reth_db::init_db; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; -use reth_primitives::ChainSpec; +use reth_primitives::{ChainSpec, PruneModes}; use reth_provider::{ProviderFactory, StageCheckpointReader}; use reth_stages::{ stages::{ @@ -208,7 +208,9 @@ impl Command { None, ) } - StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None), + StageEnum::TxLookup => { + (Box::new(TransactionLookupStage::new(batch_size, PruneModes::none())), None) + } StageEnum::AccountHashing => { (Box::new(AccountHashingStage::new(1, batch_size)), None) } diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 15dcb42f1..95f92c9a5 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -290,7 +290,7 @@ pub struct PruneConfig { impl Default for PruneConfig { fn default() -> Self { - Self { block_interval: 5, parts: PruneModes::default() } + Self { block_interval: 5, parts: PruneModes::none() } } } diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index dc963c3bd..d3959f449 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -474,7 +474,7 @@ where db.clone(), self.base_config.chain_spec.clone(), 5, - PruneModes::default(), + PruneModes::none(), PruneBatchSizes::default(), ); diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index df2e8ce63..7d91f9535 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -922,7 +922,7 @@ mod tests { fn is_pruning_needed() { let db = create_test_rw_db(); let pruner = - Pruner::new(db, MAINNET.clone(), 5, PruneModes::default(), PruneBatchSizes::default()); + Pruner::new(db, MAINNET.clone(), 5, PruneModes::none(), PruneBatchSizes::default()); // No last pruned block number was set before let first_block_number = 1; diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 8fce2e370..aae96c772 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -5,7 +5,7 @@ use criterion::{ use pprof::criterion::{Output, PProfProfiler}; use reth_db::DatabaseEnv; use reth_interfaces::test_utils::TestConsensus; -use reth_primitives::{stage::StageCheckpoint, MAINNET}; +use reth_primitives::{stage::StageCheckpoint, PruneModes, MAINNET}; use reth_provider::ProviderFactory; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, @@ -62,7 +62,7 @@ fn transaction_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); // don't need to run each stage for that many times group.sample_size(10); - let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS); + let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, PruneModes::none()); measure_stage( &mut group, diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index cf688f352..c8dfed70a 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -94,7 +94,7 @@ impl ExecutionStage { executor_factory, ExecutionStageThresholds::default(), MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, - PruneModes::default(), + PruneModes::none(), ) } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 65f5772b7..43700ab3d 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -8,12 +8,15 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, DatabaseError, }; +use reth_interfaces::provider::ProviderError; use reth_primitives::{ keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, - PrunePart, TransactionSignedNoHash, TxNumber, H256, + PruneCheckpoint, PruneModes, PrunePart, TransactionSignedNoHash, TxNumber, H256, +}; +use reth_provider::{ + BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, }; -use reth_provider::{DatabaseProviderRW, PruneCheckpointReader}; use tokio::sync::mpsc; use tracing::*; @@ -26,18 +29,19 @@ use tracing::*; pub struct TransactionLookupStage { /// The number of lookup entries to commit at once commit_threshold: u64, + prune_modes: PruneModes, } impl Default for TransactionLookupStage { fn default() -> Self { - Self { commit_threshold: 5_000_000 } + Self { commit_threshold: 5_000_000, prune_modes: PruneModes::none() } } } impl TransactionLookupStage { /// Create new instance of [TransactionLookupStage]. - pub fn new(commit_threshold: u64) -> Self { - Self { commit_threshold } + pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self { + Self { commit_threshold, prune_modes } } } @@ -52,11 +56,37 @@ impl Stage for TransactionLookupStage { async fn execute( &mut self, provider: &DatabaseProviderRW<'_, &DB>, - input: ExecInput, + mut input: ExecInput, ) -> Result { + if let Some((target_prunable_block, prune_mode)) = + self.prune_modes.prune_target_block_transaction_lookup(input.target())? + { + if target_prunable_block > input.checkpoint().block_number { + input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); + + // Save prune checkpoint only if we don't have one already. + // Otherwise, pruner may skip the unpruned range of blocks. + if provider.get_prune_checkpoint(PrunePart::TransactionLookup)?.is_none() { + let target_prunable_tx_number = provider + .block_body_indices(target_prunable_block)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))? + .last_tx_num(); + + provider.save_prune_checkpoint( + PrunePart::TransactionLookup, + PruneCheckpoint { + block_number: Some(target_prunable_block), + tx_number: Some(target_prunable_tx_number), + prune_mode, + }, + )?; + } + } + } if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } + let (tx_range, block_range, is_final_range) = input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?; let end_block = *block_range.end(); @@ -187,6 +217,8 @@ fn stage_checkpoint( let pruned_entries = provider .get_prune_checkpoint(PrunePart::TransactionLookup)? .and_then(|checkpoint| checkpoint.tx_number) + // `+1` is needed because `TxNumber` is 0-indexed + .map(|tx_number| tx_number + 1) .unwrap_or_default(); Ok(EntitiesCheckpoint { // If `TxHashNumber` table was pruned, we will have a number of entries in it not matching @@ -216,6 +248,7 @@ mod tests { use reth_provider::{ BlockReader, ProviderError, ProviderFactory, PruneCheckpointWriter, TransactionsProvider, }; + use std::ops::Sub; // Implement stage test suite. stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); @@ -253,7 +286,8 @@ mod tests { let result = rx.await.unwrap(); assert_matches!( result, - Ok(ExecOutput {checkpoint: StageCheckpoint { + Ok(ExecOutput { + checkpoint: StageCheckpoint { block_number, stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { processed, @@ -272,7 +306,7 @@ mod tests { async fn execute_intermediate_commit_transaction_lookup() { let threshold = 50; let mut runner = TransactionLookupTestRunner::default(); - runner.set_threshold(threshold); + runner.set_commit_threshold(threshold); let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold let first_input = ExecInput { target: Some(previous_stage), @@ -313,7 +347,7 @@ mod tests { ); // Execute second time to completion - runner.set_threshold(u64::MAX); + runner.set_commit_threshold(u64::MAX); let second_input = ExecInput { target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(expected_progress)), @@ -333,6 +367,49 @@ mod tests { assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); } + #[tokio::test] + async fn execute_pruned_transaction_lookup() { + let (previous_stage, prune_target, stage_progress) = (500, 400, 100); + let mut rng = generators::rng(); + + // Set up the runner + let mut runner = TransactionLookupTestRunner::default(); + let input = ExecInput { + target: Some(previous_stage), + checkpoint: Some(StageCheckpoint::new(stage_progress)), + }; + + // Seed only once with full input range + let seed = + random_block_range(&mut rng, stage_progress + 1..=previous_stage, H256::zero(), 0..2); + runner.tx.insert_blocks(seed.iter(), None).expect("failed to seed execution"); + + runner.set_prune_modes(PruneModes { + transaction_lookup: Some(PruneMode::Before(prune_target)), + ..Default::default() + }); + + let rx = runner.execute(input); + + // Assert the successful result + let result = rx.await.unwrap(); + assert_matches!( + result, + Ok(ExecOutput { + checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: true }) if block_number == previous_stage && processed == total && + total == runner.tx.table::().unwrap().len() as u64 + ); + + // Validate the stage execution + assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); + } + #[test] fn stage_checkpoint_pruned() { let tx = TestTransaction::default(); @@ -366,7 +443,8 @@ mod tests { blocks[..=max_pruned_block as usize] .iter() .map(|block| block.body.len() as u64) - .sum::(), + .sum::() + .sub(1), // `TxNumber` is 0-indexed ), prune_mode: PruneMode::Full, }, @@ -392,18 +470,27 @@ mod tests { struct TransactionLookupTestRunner { tx: TestTransaction, - threshold: u64, + commit_threshold: u64, + prune_modes: PruneModes, } impl Default for TransactionLookupTestRunner { fn default() -> Self { - Self { threshold: 1000, tx: TestTransaction::default() } + Self { + tx: TestTransaction::default(), + commit_threshold: 1000, + prune_modes: PruneModes::none(), + } } } impl TransactionLookupTestRunner { - fn set_threshold(&mut self, threshold: u64) { - self.threshold = threshold; + fn set_commit_threshold(&mut self, threshold: u64) { + self.commit_threshold = threshold; + } + + fn set_prune_modes(&mut self, prune_modes: PruneModes) { + self.prune_modes = prune_modes; } /// # Panics @@ -441,7 +528,10 @@ mod tests { } fn stage(&self) -> Self::S { - TransactionLookupStage { commit_threshold: self.threshold } + TransactionLookupStage { + commit_threshold: self.commit_threshold, + prune_modes: self.prune_modes.clone(), + } } } @@ -460,13 +550,22 @@ mod tests { fn validate_execution( &self, - input: ExecInput, + mut input: ExecInput, output: Option, ) -> Result<(), TestRunnerError> { match output { Some(output) => { let provider = self.tx.inner(); + if let Some((target_prunable_block, _)) = self + .prune_modes + .prune_target_block_transaction_lookup(input.target()) + .expect("prune target block for transaction lookup") + { + if target_prunable_block > input.checkpoint().block_number { + input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); + } + } let start_block = input.next_block(); let end_block = output.checkpoint.block_number;