fix(engine, pruner): prune poll logic, history indices (#4043)

This commit is contained in:
Alexey Shekhirin
2023-08-03 15:54:16 +01:00
committed by GitHub
parent 8d0aa64ab8
commit f917d49fb4
4 changed files with 161 additions and 150 deletions

View File

@ -51,6 +51,18 @@ impl ForkchoiceStateTracker {
self.latest_status().map(|s| s.is_valid()).unwrap_or(false)
}
/// Returns whether the latest received FCU is syncing: [ForkchoiceStatus::Syncing]
#[allow(unused)]
pub(crate) fn is_latest_syncing(&self) -> bool {
self.latest_status().map(|s| s.is_syncing()).unwrap_or(false)
}
/// Returns whether the latest received FCU is syncing: [ForkchoiceStatus::Invalid]
#[allow(unused)]
pub(crate) fn is_latest_invalid(&self) -> bool {
self.latest_status().map(|s| s.is_invalid()).unwrap_or(false)
}
/// Returns the last valid head hash.
#[allow(unused)]
pub(crate) fn last_valid_head(&self) -> Option<H256> {
@ -98,6 +110,10 @@ impl ForkchoiceStatus {
matches!(self, ForkchoiceStatus::Valid)
}
pub(crate) fn is_invalid(&self) -> bool {
matches!(self, ForkchoiceStatus::Invalid)
}
pub(crate) fn is_syncing(&self) -> bool {
matches!(self, ForkchoiceStatus::Syncing)
}

View File

@ -1715,12 +1715,13 @@ where
// Poll prune controller if all conditions are met:
// 1. Pipeline is idle
// 2. Pruning is running and we need to prioritize checking its events OR no engine and
// sync messages are pending and we may start pruning
// 3. Latest FCU status is VALID
// 2. Either of two:
// 1. Pruning is running and we need to prioritize checking its events
// 2. Both engine and sync messages are pending AND latest FCU status is not INVALID,
// so we may start pruning
if this.sync.is_pipeline_idle() &&
(this.is_prune_active() || is_pending) &&
this.forkchoice_state_tracker.is_latest_valid()
(this.is_prune_active() ||
is_pending && !this.forkchoice_state_tracker.is_latest_invalid())
{
if let Some(ref mut prune) = this.prune {
match prune.poll(cx, this.blockchain.canonical_tip().number) {

View File

@ -6,12 +6,13 @@ use reth_db::{
abstraction::cursor::{DbCursorRO, DbCursorRW},
database::Database,
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey},
table::Table,
tables,
transaction::DbTxMut,
BlockNumberList,
};
use reth_primitives::{
Address, BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber,
BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
@ -81,6 +82,11 @@ impl<DB: Database> Pruner<DB> {
/// Run the pruner
pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
trace!(
target: "pruner",
%tip_block_number,
"Pruner started"
);
let start = Instant::now();
let provider = self.provider_factory.provider_rw()?;
@ -143,8 +149,15 @@ impl<DB: Database> Pruner<DB> {
provider.commit()?;
self.last_pruned_block_number = Some(tip_block_number);
self.metrics.pruner.duration_seconds.record(start.elapsed());
let elapsed = start.elapsed();
self.metrics.pruner.duration_seconds.record(elapsed);
trace!(
target: "pruner",
%tip_block_number,
?elapsed,
"Pruner finished"
);
Ok(())
}
@ -228,11 +241,11 @@ impl<DB: Database> Pruner<DB> {
provider.prune_table_with_iterator_in_batches::<tables::Receipts>(
range,
self.batch_sizes.receipts,
|entries| {
processed += entries;
|rows| {
processed += rows;
trace!(
target: "pruner",
%entries,
%rows,
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
"Pruned receipts"
);
@ -293,11 +306,11 @@ impl<DB: Database> Pruner<DB> {
// Pre-sort hashes to prune them in order
hashes.sort_unstable();
let entries = provider.prune_table_with_iterator::<tables::TxHashNumber>(hashes)?;
processed += entries;
let rows = provider.prune_table_with_iterator::<tables::TxHashNumber>(hashes)?;
processed += rows;
trace!(
target: "pruner",
%entries,
%rows,
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
"Pruned transaction lookup"
);
@ -336,11 +349,11 @@ impl<DB: Database> Pruner<DB> {
provider.prune_table_with_range_in_batches::<tables::TxSenders>(
range,
self.batch_sizes.transaction_senders,
|entries| {
processed += entries;
|rows, _| {
processed += rows;
trace!(
target: "pruner",
%entries,
%rows,
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
"Pruned transaction senders"
);
@ -370,92 +383,34 @@ impl<DB: Database> Pruner<DB> {
let range = from_block..=to_block;
let total = range.clone().count();
let mut processed = 0;
provider.prune_table_with_range_in_batches::<tables::AccountChangeSet>(
range,
self.batch_sizes.account_history,
|entries| {
processed += entries;
|keys, rows| {
trace!(
target: "pruner",
%entries,
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
%keys,
%rows,
progress = format!("{:.1}%", 100.0 * keys as f64 / total as f64),
"Pruned account history (changesets)"
);
},
)?;
let mut cursor = provider.tx_ref().cursor_write::<tables::AccountHistory>()?;
// Prune `AccountHistory` table:
// 1. If the shard has `highest_block_number` less than or equal to the target block number
// for pruning, delete the shard completely.
// 2. If the shard has `highest_block_number` greater than the target block number for
// pruning, filter block numbers inside the shard which are less than the target
// block number for pruning.
while let Some(result) = cursor.next()? {
let (key, blocks): (ShardedKey<Address>, BlockNumberList) = result;
if key.highest_block_number <= to_block {
// If shard consists only of block numbers less than the target one, delete shard
// completely.
cursor.delete_current()?;
if key.highest_block_number == to_block {
// Shard contains only block numbers up to the target one, so we can skip to the
// next address. It is guaranteed that further shards for this address will not
// contain the target block number, as it's in this shard.
cursor.seek_exact(ShardedKey::last(key.key))?;
}
} else {
// Shard contains block numbers that are higher than the target one, so we need to
// filter it. It is guaranteed that further shards for this address will not contain
// the target block number, as it's in this shard.
let blocks = blocks
.iter(0)
.skip_while(|block| *block <= to_block as usize)
.collect::<Vec<_>>();
if blocks.is_empty() {
// If there are no more blocks in this shard, we need to remove it, as empty
// shards are not allowed.
if key.highest_block_number == u64::MAX {
// If current shard is the last shard for this address, replace it with the
// previous shard.
if let Some((prev_key, prev_value)) = cursor.prev()? {
if prev_key.key == key.key {
cursor.delete_current()?;
// Upsert will replace the last shard for this address with the
// previous value
cursor.upsert(key.clone(), prev_value)?;
}
}
} else {
// If current shard is not the last shard for this address, just delete it.
cursor.delete_current()?;
}
} else {
cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(blocks))?;
}
// Jump to the next address
cursor.seek_exact(ShardedKey::last(key.key))?;
}
processed += 1;
if processed % self.batch_sizes.account_history == 0 {
self.prune_history_indices::<tables::AccountHistory, _>(
provider,
to_block,
|a, b| a.key == b.key,
|key| ShardedKey::last(key.key),
self.batch_sizes.account_history,
|rows| {
trace!(
target: "pruner",
entries = self.batch_sizes.account_history,
rows,
"Pruned account history (indices)"
);
}
}
if processed % self.batch_sizes.account_history != 0 {
trace!(
target: "pruner",
entries = processed % self.batch_sizes.account_history,
"Pruned account history (indices)"
);
}
},
)?;
provider.save_prune_checkpoint(
PrunePart::AccountHistory,
@ -478,104 +433,136 @@ impl<DB: Database> Pruner<DB> {
.map(|checkpoint| checkpoint.block_number + 1)
.unwrap_or_default();
let block_range = from_block..=to_block;
let total = block_range.clone().count();
let range = BlockNumberAddress::range(block_range);
let mut processed = 0;
provider.prune_table_with_range_in_batches::<tables::StorageChangeSet>(
range,
self.batch_sizes.storage_history,
|entries| {
processed += entries;
|keys, rows| {
trace!(
target: "pruner",
%entries,
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
%keys,
%rows,
"Pruned storage history (changesets)"
);
},
)?;
let mut cursor = provider.tx_ref().cursor_write::<tables::StorageHistory>()?;
// Prune `StorageHistory` table:
self.prune_history_indices::<tables::StorageHistory, _>(
provider,
to_block,
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
|key| StorageShardedKey::last(key.address, key.sharded_key.key),
self.batch_sizes.storage_history,
|rows| {
trace!(
target: "pruner",
rows,
"Pruned storage history (indices)"
);
},
)?;
provider.save_prune_checkpoint(
PrunePart::StorageHistory,
PruneCheckpoint { block_number: to_block, prune_mode },
)?;
Ok(())
}
/// Prune history indices up to the provided block, inclusive.
fn prune_history_indices<T, SK>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
to_block: BlockNumber,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
last_key: impl Fn(&T::Key) -> T::Key,
batch_size: usize,
batch_callback: impl Fn(usize),
) -> PrunerResult
where
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
{
let mut processed = 0;
let mut cursor = provider.tx_ref().cursor_write::<T>()?;
// Prune history table:
// 1. If the shard has `highest_block_number` less than or equal to the target block number
// for pruning, delete the shard completely.
// 2. If the shard has `highest_block_number` greater than the target block number for
// pruning, filter block numbers inside the shard which are less than the target
// block number for pruning.
while let Some(result) = cursor.next()? {
let (key, blocks): (StorageShardedKey, BlockNumberList) = result;
let (key, blocks): (T::Key, BlockNumberList) = result;
if key.sharded_key.highest_block_number <= to_block {
if key.as_ref().highest_block_number <= to_block {
// If shard consists only of block numbers less than the target one, delete shard
// completely.
cursor.delete_current()?;
if key.sharded_key.highest_block_number == to_block {
if key.as_ref().highest_block_number == to_block {
// Shard contains only block numbers up to the target one, so we can skip to the
// next storage slot for this address. It is guaranteed that further shards for
// this address and storage slot will not contain the target block number, as
// it's in this shard.
cursor.seek_exact(StorageShardedKey::last(key.address, key.sharded_key.key))?;
// next sharded key. It is guaranteed that further shards for this sharded key
// will not contain the target block number, as it's in this shard.
cursor.seek_exact(last_key(&key))?;
}
} else {
// Shard contains block numbers that are higher than the target one, so we need to
// filter it. It is guaranteed that further shards for this address and storage slot
// will not contain the target block number, as it's in this shard.
let blocks = blocks
// filter it. It is guaranteed that further shards for this sharded key will not
// contain the target block number, as it's in this shard.
let new_blocks = blocks
.iter(0)
.skip_while(|block| *block <= to_block as usize)
.collect::<Vec<_>>();
if blocks.is_empty() {
// If there are no more blocks in this shard, we need to remove it, as empty
// shards are not allowed.
if key.sharded_key.highest_block_number == u64::MAX {
// If current shard is the last shard for this address and storage slot,
// replace it with the previous shard.
if let Some((prev_key, prev_value)) = cursor.prev()? {
if prev_key.address == key.address &&
prev_key.sharded_key.key == key.sharded_key.key
if blocks.len() != new_blocks.len() {
// If there were blocks less than or equal to the target one
// (so the shard has changed), update the shard.
if new_blocks.is_empty() {
// If there are no more blocks in this shard, we need to remove it, as empty
// shards are not allowed.
if key.as_ref().highest_block_number == u64::MAX {
// If current shard is the last shard for this sharded key, replace it
// with the previous shard.
if let Some(prev_value) = cursor
.prev()?
.filter(|(prev_key, _)| key_matches(prev_key, &key))
.map(|(_, prev_value)| prev_value)
{
cursor.delete_current()?;
// Upsert will replace the last shard for this address and storage
// slot with the previous value
// Upsert will replace the last shard for this sharded key with the
// previous value
cursor.upsert(key.clone(), prev_value)?;
} else {
// If there's no previous shard for this sharded key,
// just delete last shard completely.
cursor.delete_current()?;
}
} else {
// If current shard is not the last shard for this sharded key,
// just delete it.
cursor.delete_current()?;
}
} else {
// If current shard is not the last shard for this address, just delete it.
cursor.delete_current()?;
cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))?;
}
} else {
cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(blocks))?;
}
// Jump to the next address
cursor.seek_exact(StorageShardedKey::last(key.address, key.sharded_key.key))?;
cursor.seek_exact(last_key(&key))?;
}
processed += 1;
if processed % self.batch_sizes.storage_history == 0 {
trace!(
target: "pruner",
entries = self.batch_sizes.storage_history,
"Pruned storage history (indices)"
);
if processed % batch_size == 0 {
batch_callback(batch_size);
}
}
if processed % self.batch_sizes.storage_history != 0 {
trace!(
target: "pruner",
entries = processed % self.batch_sizes.storage_history,
"Pruned storage history (indices)"
);
if processed % batch_size != 0 {
batch_callback(processed % batch_size);
}
provider.save_prune_checkpoint(
PrunePart::StorageHistory,
PruneCheckpoint { block_number: to_block, prune_mode },
)?;
Ok(())
}
}

View File

@ -660,33 +660,40 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
}
/// Prune the table for the specified key range, calling `chunk_callback` after every
/// `batch_size` pruned rows.
/// `batch_size` pruned rows with number of total unique keys and total rows pruned. For dupsort
/// tables, these numbers will be different as one key can correspond to multiple rows.
///
/// Returns number of rows pruned.
pub fn prune_table_with_range_in_batches<T: Table>(
&self,
keys: impl RangeBounds<T::Key>,
batch_size: usize,
mut batch_callback: impl FnMut(usize),
) -> std::result::Result<usize, DatabaseError> {
mut batch_callback: impl FnMut(usize, usize),
) -> std::result::Result<(), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut walker = cursor.walk_range(keys)?;
let mut deleted = 0;
let mut deleted_keys = 0;
let mut deleted_rows = 0;
let mut previous_key = None;
while walker.next().transpose()?.is_some() {
while let Some((key, _)) = walker.next().transpose()? {
walker.delete_current()?;
deleted += 1;
deleted_rows += 1;
if previous_key.as_ref().map(|previous_key| previous_key != &key).unwrap_or(true) {
deleted_keys += 1;
previous_key = Some(key);
}
if deleted % batch_size == 0 {
batch_callback(batch_size);
if deleted_rows % batch_size == 0 {
batch_callback(deleted_keys, deleted_rows);
}
}
if deleted % batch_size != 0 {
batch_callback(deleted % batch_size);
if deleted_rows % batch_size != 0 {
batch_callback(deleted_keys, deleted_rows);
}
Ok(deleted)
Ok(())
}
/// Load shard and remove it. If list is empty, last shard was full or