From 9c1b48b9837518aca76356770c8fe0f0b1f66ed1 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 26 Jun 2023 16:40:54 +0300 Subject: [PATCH] chore(provider): simplify history inserts (#3356) --- .../src/providers/database/provider.rs | 151 +++++++----------- 1 file changed, 62 insertions(+), 89 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 66a72d21d..f941f5ae9 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -12,9 +12,8 @@ use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::{Database, DatabaseGAT}, models::{ - sharded_key, - storage_sharded_key::{self, StorageShardedKey}, - AccountBeforeTx, BlockNumberAddress, ShardedKey, StoredBlockBodyIndices, + sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress, + ShardedKey, StoredBlockBodyIndices, }, table::Table, tables, @@ -712,20 +711,66 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(()) } - /// Load last shard and check if it is full and remove if it is not. If list is empty, last - /// shard was full or there is no shards at all. - pub fn take_last_storage_shard(&self, address: Address, storage_key: H256) -> Result> { - let mut cursor = self.tx.cursor_read::()?; - let last = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?; - if let Some((storage_shard_key, list)) = last { + /// Load shard and remove it. If list is empty, last shard was full or + /// there are no shards at all. + fn take_shard(&self, key: T::Key) -> Result> + where + T: Table, + { + let mut cursor = self.tx.cursor_read::()?; + let shard = cursor.seek_exact(key)?; + if let Some((shard_key, list)) = shard { // delete old shard so new one can be inserted. - self.tx.delete::(storage_shard_key, None)?; + self.tx.delete::(shard_key, None)?; let list = list.iter(0).map(|i| i as u64).collect::>(); return Ok(list) } Ok(Vec::new()) } + /// Insert history index to the database. + /// + /// For each updated partial key, this function removes the last shard from + /// the database (if any), appends the new indices to it, chunks the resulting integer list and + /// inserts the new shards back into the database. + /// + /// This function is used by history indexing stages. + fn append_history_index( + &self, + index_updates: BTreeMap>, + mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key, + ) -> Result<()> + where + P: Copy, + T: Table, + { + for (partial_key, indices) in index_updates { + let last_shard = self.take_shard::(sharded_key_factory(partial_key, u64::MAX))?; + // chunk indices and insert them in shards of N size. + let indices = last_shard.iter().chain(indices.iter()); + let chunks = indices + .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) + .into_iter() + .map(|chunks| chunks.map(|i| *i as usize).collect::>()) + .collect::>(); + + let mut chunks = chunks.into_iter().peekable(); + while let Some(list) = chunks.next() { + let highest_block_number = if chunks.peek().is_some() { + *list.last().expect("`chunks` does not return empty list") as u64 + } else { + // Insert last list with u64::MAX + u64::MAX + }; + self.tx.put::( + sharded_key_factory(partial_key, highest_block_number), + BlockNumberList::new_pre_sorted(list), + )?; + } + } + Ok(()) + } + /// Append blocks and insert its post state. /// This will insert block data to all related tables and will update pipeline progress. pub fn append_blocks_with_post_state( @@ -1592,91 +1637,19 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider &self, storage_transitions: BTreeMap<(Address, H256), Vec>, ) -> Result<()> { - for ((address, storage_key), mut indices) in storage_transitions { - let mut last_shard = self.take_last_storage_shard(address, storage_key)?; - last_shard.append(&mut indices); - - // chunk indices and insert them in shards of N size. - let mut chunks = last_shard - .iter() - .chunks(storage_sharded_key::NUM_OF_INDICES_IN_SHARD) - .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); - let last_chunk = chunks.pop(); - - // chunk indices and insert them in shards of N size. - chunks.into_iter().try_for_each(|list| { - self.tx.put::( - StorageShardedKey::new( - address, - storage_key, - *list.last().expect("Chuck does not return empty list") as BlockNumber, - ), - BlockNumberList::new(list).expect("Indices are presorted and not empty"), - ) - })?; - // Insert last list with u64::MAX - if let Some(last_list) = last_chunk { - self.tx.put::( - StorageShardedKey::new(address, storage_key, u64::MAX), - BlockNumberList::new(last_list).expect("Indices are presorted and not empty"), - )?; - } - } - Ok(()) + self.append_history_index::<_, tables::StorageHistory>( + storage_transitions, + |(address, storage_key), highest_block_number| { + StorageShardedKey::new(address, storage_key, highest_block_number) + }, + ) } fn insert_account_history_index( &self, account_transitions: BTreeMap>, ) -> Result<()> { - // insert indexes to AccountHistory. - for (address, mut indices) in account_transitions { - // Load last shard and check if it is full and remove if it is not. If list is empty, - // last shard was full or there is no shards at all. - let mut last_shard = { - let mut cursor = self.tx.cursor_read::()?; - let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; - if let Some((shard_key, list)) = last { - // delete old shard so new one can be inserted. - self.tx.delete::(shard_key, None)?; - let list = list.iter(0).map(|i| i as u64).collect::>(); - list - } else { - Vec::new() - } - }; - - last_shard.append(&mut indices); - // chunk indices and insert them in shards of N size. - let mut chunks = last_shard - .iter() - .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) - .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); - let last_chunk = chunks.pop(); - - chunks.into_iter().try_for_each(|list| { - self.tx.put::( - ShardedKey::new( - address, - *list.last().expect("Chuck does not return empty list") as BlockNumber, - ), - BlockNumberList::new(list).expect("Indices are presorted and not empty"), - ) - })?; - - // Insert last list with u64::MAX - if let Some(last_list) = last_chunk { - self.tx.put::( - ShardedKey::new(address, u64::MAX), - BlockNumberList::new(last_list).expect("Indices are presorted and not empty"), - )? - } - } - Ok(()) + self.append_history_index::<_, tables::AccountHistory>(account_transitions, ShardedKey::new) } fn unwind_storage_history_indices(&self, range: Range) -> Result {