feat(stages): respect PruneModes in TxLookup stage (#4390)

This commit is contained in:
Alexey Shekhirin
2023-09-01 15:06:49 +01:00
committed by GitHub
parent e66e3e3556
commit 6bb94af5bb
9 changed files with 133 additions and 29 deletions

8
Cargo.lock generated
View File

@ -6188,7 +6188,7 @@ dependencies = [
[[package]]
name = "revm"
version = "3.3.0"
source = "git+https://github.com/bluealloy/revm/?branch=release/v25#6084e0fa2d457931cd8c9d29934bca0812b5b8d6"
source = "git+https://github.com/bluealloy/revm/?branch=release/v25#88337924f4d16ed1f5e4cde12a03d0cb755cd658"
dependencies = [
"auto_impl",
"revm-interpreter",
@ -6198,7 +6198,7 @@ dependencies = [
[[package]]
name = "revm-interpreter"
version = "1.1.2"
source = "git+https://github.com/bluealloy/revm/?branch=release/v25#6084e0fa2d457931cd8c9d29934bca0812b5b8d6"
source = "git+https://github.com/bluealloy/revm/?branch=release/v25#88337924f4d16ed1f5e4cde12a03d0cb755cd658"
dependencies = [
"derive_more",
"enumn",
@ -6209,7 +6209,7 @@ dependencies = [
[[package]]
name = "revm-precompile"
version = "2.0.3"
source = "git+https://github.com/bluealloy/revm/?branch=release/v25#6084e0fa2d457931cd8c9d29934bca0812b5b8d6"
source = "git+https://github.com/bluealloy/revm/?branch=release/v25#88337924f4d16ed1f5e4cde12a03d0cb755cd658"
dependencies = [
"k256",
"num",
@ -6225,7 +6225,7 @@ dependencies = [
[[package]]
name = "revm-primitives"
version = "1.1.2"
source = "git+https://github.com/bluealloy/revm/?branch=release/v25#6084e0fa2d457931cd8c9d29934bca0812b5b8d6"
source = "git+https://github.com/bluealloy/revm/?branch=release/v25#88337924f4d16ed1f5e4cde12a03d0cb755cd658"
dependencies = [
"arbitrary",
"auto_impl",

View File

@ -865,7 +865,10 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
stage_config.storage_hashing.commit_threshold,
))
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold))
.set(TransactionLookupStage::new(
stage_config.transaction_lookup.commit_threshold,
prune_modes.clone(),
))
.set(IndexAccountHistoryStage::new(
stage_config.index_account_history.commit_threshold,
prune_modes.clone(),

View File

@ -12,7 +12,7 @@ use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_db::init_db;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_primitives::ChainSpec;
use reth_primitives::{ChainSpec, PruneModes};
use reth_provider::{ProviderFactory, StageCheckpointReader};
use reth_stages::{
stages::{
@ -208,7 +208,9 @@ impl Command {
None,
)
}
StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None),
StageEnum::TxLookup => {
(Box::new(TransactionLookupStage::new(batch_size, PruneModes::none())), None)
}
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None)
}

View File

@ -290,7 +290,7 @@ pub struct PruneConfig {
impl Default for PruneConfig {
fn default() -> Self {
Self { block_interval: 5, parts: PruneModes::default() }
Self { block_interval: 5, parts: PruneModes::none() }
}
}

View File

@ -474,7 +474,7 @@ where
db.clone(),
self.base_config.chain_spec.clone(),
5,
PruneModes::default(),
PruneModes::none(),
PruneBatchSizes::default(),
);

View File

@ -922,7 +922,7 @@ mod tests {
fn is_pruning_needed() {
let db = create_test_rw_db();
let pruner =
Pruner::new(db, MAINNET.clone(), 5, PruneModes::default(), PruneBatchSizes::default());
Pruner::new(db, MAINNET.clone(), 5, PruneModes::none(), PruneBatchSizes::default());
// No last pruned block number was set before
let first_block_number = 1;

View File

@ -5,7 +5,7 @@ use criterion::{
use pprof::criterion::{Output, PProfProfiler};
use reth_db::DatabaseEnv;
use reth_interfaces::test_utils::TestConsensus;
use reth_primitives::{stage::StageCheckpoint, MAINNET};
use reth_primitives::{stage::StageCheckpoint, PruneModes, MAINNET};
use reth_provider::ProviderFactory;
use reth_stages::{
stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage},
@ -62,7 +62,7 @@ fn transaction_lookup(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, PruneModes::none());
measure_stage(
&mut group,

View File

@ -94,7 +94,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
executor_factory,
ExecutionStageThresholds::default(),
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
PruneModes::default(),
PruneModes::none(),
)
}

View File

@ -8,12 +8,15 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_interfaces::provider::ProviderError;
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
PrunePart, TransactionSignedNoHash, TxNumber, H256,
PruneCheckpoint, PruneModes, PrunePart, TransactionSignedNoHash, TxNumber, H256,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter,
};
use reth_provider::{DatabaseProviderRW, PruneCheckpointReader};
use tokio::sync::mpsc;
use tracing::*;
@ -26,18 +29,19 @@ use tracing::*;
pub struct TransactionLookupStage {
/// The number of lookup entries to commit at once
commit_threshold: u64,
prune_modes: PruneModes,
}
impl Default for TransactionLookupStage {
fn default() -> Self {
Self { commit_threshold: 5_000_000 }
Self { commit_threshold: 5_000_000, prune_modes: PruneModes::none() }
}
}
impl TransactionLookupStage {
/// Create new instance of [TransactionLookupStage].
pub fn new(commit_threshold: u64) -> Self {
Self { commit_threshold }
pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self {
Self { commit_threshold, prune_modes }
}
}
@ -52,11 +56,37 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
async fn execute(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
mut input: ExecInput,
) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, prune_mode)) =
self.prune_modes.prune_target_block_transaction_lookup(input.target())?
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PrunePart::TransactionLookup)?.is_none() {
let target_prunable_tx_number = provider
.block_body_indices(target_prunable_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
.last_tx_num();
provider.save_prune_checkpoint(
PrunePart::TransactionLookup,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: Some(target_prunable_tx_number),
prune_mode,
},
)?;
}
}
}
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (tx_range, block_range, is_final_range) =
input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?;
let end_block = *block_range.end();
@ -187,6 +217,8 @@ fn stage_checkpoint<DB: Database>(
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::TransactionLookup)?
.and_then(|checkpoint| checkpoint.tx_number)
// `+1` is needed because `TxNumber` is 0-indexed
.map(|tx_number| tx_number + 1)
.unwrap_or_default();
Ok(EntitiesCheckpoint {
// If `TxHashNumber` table was pruned, we will have a number of entries in it not matching
@ -216,6 +248,7 @@ mod tests {
use reth_provider::{
BlockReader, ProviderError, ProviderFactory, PruneCheckpointWriter, TransactionsProvider,
};
use std::ops::Sub;
// Implement stage test suite.
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
@ -253,7 +286,8 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput {checkpoint: StageCheckpoint {
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
@ -272,7 +306,7 @@ mod tests {
async fn execute_intermediate_commit_transaction_lookup() {
let threshold = 50;
let mut runner = TransactionLookupTestRunner::default();
runner.set_threshold(threshold);
runner.set_commit_threshold(threshold);
let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold
let first_input = ExecInput {
target: Some(previous_stage),
@ -313,7 +347,7 @@ mod tests {
);
// Execute second time to completion
runner.set_threshold(u64::MAX);
runner.set_commit_threshold(u64::MAX);
let second_input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(expected_progress)),
@ -333,6 +367,49 @@ mod tests {
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}
#[tokio::test]
async fn execute_pruned_transaction_lookup() {
let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
let mut rng = generators::rng();
// Set up the runner
let mut runner = TransactionLookupTestRunner::default();
let input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Seed only once with full input range
let seed =
random_block_range(&mut rng, stage_progress + 1..=previous_stage, H256::zero(), 0..2);
runner.tx.insert_blocks(seed.iter(), None).expect("failed to seed execution");
runner.set_prune_modes(PruneModes {
transaction_lookup: Some(PruneMode::Before(prune_target)),
..Default::default()
});
let rx = runner.execute(input);
// Assert the successful result
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
total == runner.tx.table::<tables::Transactions>().unwrap().len() as u64
);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}
#[test]
fn stage_checkpoint_pruned() {
let tx = TestTransaction::default();
@ -366,7 +443,8 @@ mod tests {
blocks[..=max_pruned_block as usize]
.iter()
.map(|block| block.body.len() as u64)
.sum::<u64>(),
.sum::<u64>()
.sub(1), // `TxNumber` is 0-indexed
),
prune_mode: PruneMode::Full,
},
@ -392,18 +470,27 @@ mod tests {
struct TransactionLookupTestRunner {
tx: TestTransaction,
threshold: u64,
commit_threshold: u64,
prune_modes: PruneModes,
}
impl Default for TransactionLookupTestRunner {
fn default() -> Self {
Self { threshold: 1000, tx: TestTransaction::default() }
Self {
tx: TestTransaction::default(),
commit_threshold: 1000,
prune_modes: PruneModes::none(),
}
}
}
impl TransactionLookupTestRunner {
fn set_threshold(&mut self, threshold: u64) {
self.threshold = threshold;
fn set_commit_threshold(&mut self, threshold: u64) {
self.commit_threshold = threshold;
}
fn set_prune_modes(&mut self, prune_modes: PruneModes) {
self.prune_modes = prune_modes;
}
/// # Panics
@ -441,7 +528,10 @@ mod tests {
}
fn stage(&self) -> Self::S {
TransactionLookupStage { commit_threshold: self.threshold }
TransactionLookupStage {
commit_threshold: self.commit_threshold,
prune_modes: self.prune_modes.clone(),
}
}
}
@ -460,13 +550,22 @@ mod tests {
fn validate_execution(
&self,
input: ExecInput,
mut input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
match output {
Some(output) => {
let provider = self.tx.inner();
if let Some((target_prunable_block, _)) = self
.prune_modes
.prune_target_block_transaction_lookup(input.target())
.expect("prune target block for transaction lookup")
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
}
}
let start_block = input.next_block();
let end_block = output.checkpoint.block_number;