fix(stages): disable index history stages checkpoints (#3178)

This commit is contained in:
Alexey Shekhirin
2023-06-15 18:06:45 +01:00
committed by GitHub
parent f25fcca33e
commit 32e642d6b0
2 changed files with 14 additions and 427 deletions

View File

@ -1,13 +1,8 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError};
use reth_primitives::{
stage::{
CheckpointBlockRange, EntitiesCheckpoint, IndexHistoryCheckpoint, StageCheckpoint, StageId,
},
BlockNumber,
};
use reth_db::database::Database;
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_provider::DatabaseProviderRW;
use std::{fmt::Debug, ops::RangeInclusive};
use std::fmt::Debug;
/// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
@ -44,27 +39,11 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let mut stage_checkpoint = stage_checkpoint(
provider,
input.checkpoint(),
// It is important to provide the full block range into the checkpoint,
// not the one accounting for commit threshold, to get the correct range end.
&input.next_block_range(),
)?;
let indices = provider.get_account_transition_ids_from_changeset(range.clone())?;
let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::<u64>();
// Insert changeset to history index
provider.insert_account_history_index(indices)?;
stage_checkpoint.progress.processed += changesets;
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(*range.end())
.with_index_history_stage_checkpoint(stage_checkpoint),
done: is_final_range,
})
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })
}
/// Unwind the stage.
@ -76,70 +55,15 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
let changesets = provider.unwind_account_history_indices(range)?;
let checkpoint =
if let Some(mut stage_checkpoint) = input.checkpoint.index_history_stage_checkpoint() {
stage_checkpoint.progress.processed -= changesets as u64;
StageCheckpoint::new(unwind_progress)
.with_index_history_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_progress)
};
provider.unwind_account_history_indices(range)?;
// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
/// The function proceeds as follows:
/// 1. It first checks if the checkpoint has an [IndexHistoryCheckpoint] that matches the given
/// block range. If it does, the function returns that checkpoint.
/// 2. If the checkpoint's block range end matches the current checkpoint's block number, it creates
/// a new [IndexHistoryCheckpoint] with the given block range and updates the progress with the
/// current progress.
/// 3. If none of the above conditions are met, it creates a new [IndexHistoryCheckpoint] with the
/// given block range and calculates the progress by counting the number of processed entries in the
/// [tables::AccountChangeSet] table within the given block range.
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
checkpoint: StageCheckpoint,
range: &RangeInclusive<BlockNumber>,
) -> Result<IndexHistoryCheckpoint, DatabaseError> {
Ok(match checkpoint.index_history_stage_checkpoint() {
Some(stage_checkpoint @ IndexHistoryCheckpoint { block_range, .. })
if block_range == CheckpointBlockRange::from(range) =>
{
stage_checkpoint
}
Some(IndexHistoryCheckpoint { block_range, progress })
if block_range.to == checkpoint.block_number =>
{
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: progress.processed,
total: provider.tx_ref().entries::<tables::AccountChangeSet>()? as u64,
},
}
}
_ => IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: provider
.tx_ref()
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(0..=checkpoint.block_number)?
.count() as u64,
total: provider.tx_ref().entries::<tables::AccountChangeSet>()? as u64,
},
},
})
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use reth_provider::ProviderFactory;
use std::collections::BTreeMap;
@ -213,18 +137,7 @@ mod tests {
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let out = stage.execute(&mut provider, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: input.next_block(), to: run_to },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
provider.commit().unwrap();
}
@ -437,116 +350,4 @@ mod tests {
])
);
}
#[tokio::test]
async fn stage_checkpoint_range() {
// init
let test_tx = TestTransaction::default();
// setup
partial_setup(&test_tx);
// run
{
let mut stage = IndexAccountHistoryStage { commit_threshold: 4 }; // Two runs required
let factory = ProviderFactory::new(&test_tx.tx, MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let mut input = ExecInput { target: Some(5), ..Default::default() };
let out = stage.execute(&mut provider, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(4).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 5 },
progress: EntitiesCheckpoint { processed: 1, total: 2 }
}
),
done: false
}
);
input.checkpoint = Some(out.checkpoint);
let out = stage.execute(&mut provider, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 5, to: 5 },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
provider.commit().unwrap();
}
// verify
let table = cast(test_tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 5])]));
// unwind
unwind(&test_tx, 5, 0).await;
// verify initial state
let table = test_tx.table::<tables::AccountHistory>().unwrap();
assert!(table.is_empty());
}
#[test]
fn stage_checkpoint_recalculation() {
let tx = TestTransaction::default();
tx.commit(|tx| {
tx.put::<tables::AccountChangeSet>(
1,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000001")),
info: None,
},
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
1,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000002")),
info: None,
},
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
2,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000001")),
info: None,
},
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
2,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000002")),
info: None,
},
)
.unwrap();
Ok(())
})
.unwrap();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
assert_matches!(
stage_checkpoint(&provider, StageCheckpoint::new(1), &(1..=2)).unwrap(),
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 2 },
progress: EntitiesCheckpoint { processed: 2, total: 4 }
}
);
}
}

View File

@ -1,16 +1,8 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::{
cursor::DbCursorRO, database::Database, models::BlockNumberAddress, tables, transaction::DbTx,
DatabaseError,
};
use reth_primitives::{
stage::{
CheckpointBlockRange, EntitiesCheckpoint, IndexHistoryCheckpoint, StageCheckpoint, StageId,
},
BlockNumber,
};
use reth_db::{database::Database, models::BlockNumberAddress};
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_provider::DatabaseProviderRW;
use std::{fmt::Debug, ops::RangeInclusive};
use std::fmt::Debug;
/// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
@ -47,26 +39,10 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let mut stage_checkpoint = stage_checkpoint(
provider,
input.checkpoint(),
// It is important to provide the full block range into the checkpoint,
// not the one accounting for commit threshold, to get the correct range end.
&input.next_block_range(),
)?;
let indices = provider.get_storage_transition_ids_from_changeset(range.clone())?;
let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::<u64>();
provider.insert_storage_history_index(indices)?;
stage_checkpoint.progress.processed += changesets;
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(*range.end())
.with_index_history_stage_checkpoint(stage_checkpoint),
done: is_final_range,
})
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })
}
/// Unwind the stage.
@ -78,71 +54,14 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
let changesets =
provider.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
provider.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
let checkpoint =
if let Some(mut stage_checkpoint) = input.checkpoint.index_history_stage_checkpoint() {
stage_checkpoint.progress.processed -= changesets as u64;
StageCheckpoint::new(unwind_progress)
.with_index_history_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_progress)
};
Ok(UnwindOutput { checkpoint })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
/// The function proceeds as follows:
/// 1. It first checks if the checkpoint has an [IndexHistoryCheckpoint] that matches the given
/// block range. If it does, the function returns that checkpoint.
/// 2. If the checkpoint's block range end matches the current checkpoint's block number, it creates
/// a new [IndexHistoryCheckpoint] with the given block range and updates the progress with the
/// current progress.
/// 3. If none of the above conditions are met, it creates a new [IndexHistoryCheckpoint] with the
/// given block range and calculates the progress by counting the number of processed entries in the
/// [tables::StorageChangeSet] table within the given block range.
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
checkpoint: StageCheckpoint,
range: &RangeInclusive<BlockNumber>,
) -> Result<IndexHistoryCheckpoint, DatabaseError> {
Ok(match checkpoint.index_history_stage_checkpoint() {
Some(stage_checkpoint @ IndexHistoryCheckpoint { block_range, .. })
if block_range == CheckpointBlockRange::from(range) =>
{
stage_checkpoint
}
Some(IndexHistoryCheckpoint { block_range, progress })
if block_range.to == checkpoint.block_number =>
{
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: progress.processed,
total: provider.tx_ref().entries::<tables::StorageChangeSet>()? as u64,
},
}
}
_ => IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: provider
.tx_ref()
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(BlockNumberAddress::range(0..=checkpoint.block_number))?
.count() as u64,
total: provider.tx_ref().entries::<tables::StorageChangeSet>()? as u64,
},
},
})
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use reth_provider::ProviderFactory;
use std::collections::BTreeMap;
@ -226,18 +145,7 @@ mod tests {
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let out = stage.execute(&mut provider, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: input.next_block(), to: run_to },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
provider.commit().unwrap();
}
@ -453,126 +361,4 @@ mod tests {
])
);
}
#[tokio::test]
async fn stage_checkpoint_range() {
// init
let test_tx = TestTransaction::default();
// setup
partial_setup(&test_tx);
// run
{
let mut stage = IndexStorageHistoryStage { commit_threshold: 4 }; // Two runs required
let factory = ProviderFactory::new(&test_tx.tx, MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let mut input = ExecInput { target: Some(5), ..Default::default() };
let out = stage.execute(&mut provider, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(4).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 5 },
progress: EntitiesCheckpoint { processed: 1, total: 2 }
}
),
done: false
}
);
input.checkpoint = Some(out.checkpoint);
let out = stage.execute(&mut provider, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 5, to: 5 },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
provider.commit().unwrap();
}
// verify
let table = cast(test_tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 5])]));
// unwind
unwind(&test_tx, 5, 0).await;
// verify initial state
let table = test_tx.table::<tables::StorageHistory>().unwrap();
assert!(table.is_empty());
}
#[test]
fn stage_checkpoint_recalculation() {
let tx = TestTransaction::default();
tx.commit(|tx| {
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000001")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000001"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000001")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000002"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000002")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000001"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000001")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000001"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000001")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000002"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000002")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000001"
))),
)
.unwrap();
Ok(())
})
.unwrap();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
assert_matches!(
stage_checkpoint(&provider, StageCheckpoint::new(1), &(1..=2)).unwrap(),
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 2 },
progress: EntitiesCheckpoint { processed: 3, total: 6 }
}
);
}
}