chore(provider): simplify history inserts (#3356)

This commit is contained in:
Roman Krasiuk
2023-06-26 16:40:54 +03:00
committed by GitHub
parent 30a1ad2586
commit 9c1b48b983

View File

@ -12,9 +12,8 @@ use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::{Database, DatabaseGAT},
models::{
sharded_key,
storage_sharded_key::{self, StorageShardedKey},
AccountBeforeTx, BlockNumberAddress, ShardedKey, StoredBlockBodyIndices,
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
ShardedKey, StoredBlockBodyIndices,
},
table::Table,
tables,
@ -712,20 +711,66 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(())
}
/// Load last shard and check if it is full and remove if it is not. If list is empty, last
/// shard was full or there is no shards at all.
pub fn take_last_storage_shard(&self, address: Address, storage_key: H256) -> Result<Vec<u64>> {
let mut cursor = self.tx.cursor_read::<tables::StorageHistory>()?;
let last = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?;
if let Some((storage_shard_key, list)) = last {
/// Load shard and remove it. If list is empty, last shard was full or
/// there are no shards at all.
fn take_shard<T>(&self, key: T::Key) -> Result<Vec<u64>>
where
T: Table<Value = BlockNumberList>,
{
let mut cursor = self.tx.cursor_read::<T>()?;
let shard = cursor.seek_exact(key)?;
if let Some((shard_key, list)) = shard {
// delete old shard so new one can be inserted.
self.tx.delete::<tables::StorageHistory>(storage_shard_key, None)?;
self.tx.delete::<T>(shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
return Ok(list)
}
Ok(Vec::new())
}
/// Insert history index to the database.
///
/// For each updated partial key, this function removes the last shard from
/// the database (if any), appends the new indices to it, chunks the resulting integer list and
/// inserts the new shards back into the database.
///
/// This function is used by history indexing stages.
fn append_history_index<P, T>(
&self,
index_updates: BTreeMap<P, Vec<u64>>,
mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
) -> Result<()>
where
P: Copy,
T: Table<Value = BlockNumberList>,
{
for (partial_key, indices) in index_updates {
let last_shard = self.take_shard::<T>(sharded_key_factory(partial_key, u64::MAX))?;
// chunk indices and insert them in shards of N size.
let indices = last_shard.iter().chain(indices.iter());
let chunks = indices
.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let mut chunks = chunks.into_iter().peekable();
while let Some(list) = chunks.next() {
let highest_block_number = if chunks.peek().is_some() {
*list.last().expect("`chunks` does not return empty list") as u64
} else {
// Insert last list with u64::MAX
u64::MAX
};
self.tx.put::<T>(
sharded_key_factory(partial_key, highest_block_number),
BlockNumberList::new_pre_sorted(list),
)?;
}
}
Ok(())
}
/// Append blocks and insert its post state.
/// This will insert block data to all related tables and will update pipeline progress.
pub fn append_blocks_with_post_state(
@ -1592,91 +1637,19 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
&self,
storage_transitions: BTreeMap<(Address, H256), Vec<u64>>,
) -> Result<()> {
for ((address, storage_key), mut indices) in storage_transitions {
let mut last_shard = self.take_last_storage_shard(address, storage_key)?;
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(storage_sharded_key::NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
// chunk indices and insert them in shards of N size.
chunks.into_iter().try_for_each(|list| {
self.tx.put::<tables::StorageHistory>(
StorageShardedKey::new(
address,
storage_key,
*list.last().expect("Chuck does not return empty list") as BlockNumber,
),
BlockNumberList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
self.tx.put::<tables::StorageHistory>(
StorageShardedKey::new(address, storage_key, u64::MAX),
BlockNumberList::new(last_list).expect("Indices are presorted and not empty"),
)?;
}
}
Ok(())
self.append_history_index::<_, tables::StorageHistory>(
storage_transitions,
|(address, storage_key), highest_block_number| {
StorageShardedKey::new(address, storage_key, highest_block_number)
},
)
}
fn insert_account_history_index(
&self,
account_transitions: BTreeMap<Address, Vec<u64>>,
) -> Result<()> {
// insert indexes to AccountHistory.
for (address, mut indices) in account_transitions {
// Load last shard and check if it is full and remove if it is not. If list is empty,
// last shard was full or there is no shards at all.
let mut last_shard = {
let mut cursor = self.tx.cursor_read::<tables::AccountHistory>()?;
let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
if let Some((shard_key, list)) = last {
// delete old shard so new one can be inserted.
self.tx.delete::<tables::AccountHistory>(shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
list
} else {
Vec::new()
}
};
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
chunks.into_iter().try_for_each(|list| {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(
address,
*list.last().expect("Chuck does not return empty list") as BlockNumber,
),
BlockNumberList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(last_list).expect("Indices are presorted and not empty"),
)?
}
}
Ok(())
self.append_history_index::<_, tables::AccountHistory>(account_transitions, ShardedKey::new)
}
fn unwind_storage_history_indices(&self, range: Range<BlockNumberAddress>) -> Result<usize> {