chore(provider): simplify history unwind (#3355)

This commit is contained in:
Roman Krasiuk
2023-06-23 18:47:55 +03:00
committed by GitHub
parent be58d1c286
commit 2b1a34116d
5 changed files with 105 additions and 109 deletions

1
Cargo.lock generated
View File

@ -5128,6 +5128,7 @@ dependencies = [
"async-trait",
"bytes",
"criterion",
"derive_more",
"futures",
"heapless",
"iai",

View File

@ -37,6 +37,7 @@ page_size = "0.4.2"
thiserror = { workspace = true }
tempfile = { version = "3.3.0", optional = true }
parking_lot = "0.12"
derive_more = "0.99"
# arbitrary utils
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }

View File

@ -24,6 +24,12 @@ pub struct ShardedKey<T> {
pub highest_block_number: BlockNumber,
}
impl<T> AsRef<ShardedKey<T>> for ShardedKey<T> {
fn as_ref(&self) -> &ShardedKey<T> {
self
}
}
impl<T> ShardedKey<T> {
/// Creates a new `ShardedKey<T>`.
pub fn new(key: T, highest_block_number: BlockNumber) -> Self {

View File

@ -4,7 +4,7 @@ use crate::{
table::{Decode, Encode},
DatabaseError,
};
use derive_more::AsRef;
use reth_primitives::{BlockNumber, H160, H256};
use serde::{Deserialize, Serialize};
@ -19,11 +19,12 @@ pub const NUM_OF_INDICES_IN_SHARD: usize = 2_000;
/// `Address | Storagekey | 200` -> data is from transition 0 to 200.
///
/// `Address | StorageKey | 300` -> data is from transition 201 to 300.
#[derive(Debug, Default, Clone, Eq, Ord, PartialOrd, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Eq, Ord, PartialOrd, PartialEq, AsRef, Serialize, Deserialize)]
pub struct StorageShardedKey {
/// Storage account address.
pub address: H160,
/// Storage slot with highest transition id.
#[as_ref]
pub sharded_key: ShardedKey<H256>,
}

View File

@ -18,7 +18,7 @@ use reth_db::{
},
table::Table,
tables,
transaction::{DbTx, DbTxMut, DbTxMutGAT},
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
};
use reth_interfaces::Result;
@ -102,75 +102,51 @@ impl<'this, TX: DbTxMut<'this>> DatabaseProvider<'this, TX> {
}
}
/// Unwind all history shards. For boundary shard, remove it from database and
/// return last part of shard with still valid items. If all full shard were removed, return list
/// would be empty.
fn unwind_account_history_shards<'a, TX: reth_db::transaction::DbTxMutGAT<'a>>(
cursor: &mut <TX as DbTxMutGAT<'a>>::CursorMut<tables::AccountHistory>,
address: Address,
/// For a given key, unwind all history shards that are below the given block number.
///
/// S - Sharded key subtype.
/// T - Table to walk over.
/// C - Cursor implementation.
///
/// This function walks the entries from the given start key and deletes all shards that belong to
/// the key and are below the given block number.
///
/// The boundary shard (the shard is split by the block number) is removed from the database. Any
/// indices that are above the block number are filtered out. The boundary shard is returned for
/// reinsertion (if it's not empty).
fn unwind_history_shards<'a, S, T, C>(
cursor: &mut C,
start_key: T::Key,
block_number: BlockNumber,
) -> Result<Vec<usize>> {
let mut item = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
) -> Result<Vec<usize>>
where
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<S>>,
C: DbCursorRO<'a, T> + DbCursorRW<'a, T>,
{
let mut item = cursor.seek_exact(start_key)?;
while let Some((sharded_key, list)) = item {
// there is no more shard for address
if sharded_key.key != address {
// If the shard does not belong to the key, break.
if !shard_belongs_to_key(&sharded_key) {
break
}
cursor.delete_current()?;
// check first item and if it is more and eq than `block_number` delete current
// item.
let first = list.iter(0).next().expect("List can't empty");
// Check the first item.
// If it is greater or eq to the block number, delete it.
let first = list.iter(0).next().expect("List can't be empty");
if first >= block_number as usize {
item = cursor.prev()?;
continue
} else if block_number <= sharded_key.highest_block_number {
// if first element is in scope whole list would be removed.
// so at least this first element is present.
return Ok(list.iter(0).take_while(|i| *i < block_number as usize).collect::<Vec<_>>())
} else {
let new_list = list.iter(0).collect::<Vec<_>>();
return Ok(new_list)
}
}
Ok(Vec::new())
}
/// Unwind all history shards. For boundary shard, remove it from database and
/// return last part of shard with still valid items. If all full shard were removed, return list
/// would be empty but this does not mean that there is none shard left but that there is no
/// split shards.
fn unwind_storage_history_shards<'a, TX: reth_db::transaction::DbTxMutGAT<'a>>(
cursor: &mut <TX as DbTxMutGAT<'a>>::CursorMut<tables::StorageHistory>,
address: Address,
storage_key: H256,
block_number: BlockNumber,
) -> Result<Vec<usize>> {
let mut item = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?;
while let Some((storage_sharded_key, list)) = item {
// there is no more shard for address
if storage_sharded_key.address != address ||
storage_sharded_key.sharded_key.key != storage_key
{
// there is no more shard for address and storage_key.
break
}
cursor.delete_current()?;
// check first item and if it is more and eq than `block_number` delete current
// item.
let first = list.iter(0).next().expect("List can't empty");
if first >= block_number as usize {
item = cursor.prev()?;
continue
} else if block_number <= storage_sharded_key.sharded_key.highest_block_number {
// if first element is in scope whole list would be removed.
// so at least this first element is present.
} else if block_number <= sharded_key.as_ref().highest_block_number {
// Filter out all elements greater than block number.
return Ok(list.iter(0).take_while(|i| *i < block_number as usize).collect::<Vec<_>>())
} else {
return Ok(list.iter(0).collect::<Vec<_>>())
}
}
Ok(Vec::new())
}
@ -1651,47 +1627,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
Ok(())
}
fn unwind_storage_history_indices(&self, range: Range<BlockNumberAddress>) -> Result<usize> {
let storage_changesets = self
.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
let changesets = storage_changesets.len();
let last_indices = storage_changesets
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all storages and get last block number
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, H256), u64>, (index, storage)| {
// we just need address and lowest block number.
accounts.insert((index.address(), storage.key), index.block_number());
accounts
},
);
let mut cursor = self.tx.cursor_write::<tables::StorageHistory>()?;
for ((address, storage_key), rem_index) in last_indices {
let shard_part =
unwind_storage_history_shards::<TX>(&mut cursor, address, storage_key, rem_index)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
self.tx.put::<tables::StorageHistory>(
StorageShardedKey::new(address, storage_key, u64::MAX),
BlockNumberList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
)?;
}
}
Ok(changesets)
}
fn insert_account_history_index(
&self,
account_transitions: BTreeMap<Address, Vec<u64>>,
@ -1744,6 +1679,53 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
Ok(())
}
fn unwind_storage_history_indices(&self, range: Range<BlockNumberAddress>) -> Result<usize> {
let storage_changesets = self
.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
let changesets = storage_changesets.len();
let last_indices = storage_changesets
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all storages and get last block number
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, H256), u64>, (index, storage)| {
// we just need address and lowest block number.
accounts.insert((index.address(), storage.key), index.block_number());
accounts
},
);
let mut cursor = self.tx.cursor_write::<tables::StorageHistory>()?;
for ((address, storage_key), rem_index) in last_indices {
let partial_shard = unwind_history_shards::<_, tables::StorageHistory, _>(
&mut cursor,
StorageShardedKey::last(address, storage_key),
rem_index,
|storage_sharded_key| {
storage_sharded_key.address == address &&
storage_sharded_key.sharded_key.key == storage_key
},
)?;
// Check the last returned partial shard.
// If it's not empty, the shard needs to be reinserted.
if !partial_shard.is_empty() {
cursor.insert(
StorageShardedKey::last(address, storage_key),
BlockNumberList::new_pre_sorted(partial_shard),
)?;
}
}
Ok(changesets)
}
fn unwind_account_history_indices(&self, range: RangeInclusive<BlockNumber>) -> Result<usize> {
let account_changeset = self
.tx
@ -1762,18 +1744,23 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
accounts.insert(account.address, index);
accounts
});
// try to unwind the index
// Unwind the account history index.
let mut cursor = self.tx.cursor_write::<tables::AccountHistory>()?;
for (address, rem_index) in last_indices {
let shard_part = unwind_account_history_shards::<TX>(&mut cursor, address, rem_index)?;
let partial_shard = unwind_history_shards::<_, tables::AccountHistory, _>(
&mut cursor,
ShardedKey::last(address),
rem_index,
|sharded_key| sharded_key.key == address,
)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
// Check the last returned partial shard.
// If it's not empty, the shard needs to be reinserted.
if !partial_shard.is_empty() {
cursor.insert(
ShardedKey::last(address),
BlockNumberList::new_pre_sorted(partial_shard),
)?;
}
}