diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index b6e436a97..7ff06228a 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1574,15 +1574,6 @@ impl StageCheckpointReader for DatabaseProvider { } impl StageCheckpointWriter for DatabaseProvider { - /// Save stage checkpoint progress. - fn save_stage_checkpoint_progress( - &self, - id: StageId, - checkpoint: Vec, - ) -> ProviderResult<()> { - Ok(self.tx.put::(id.to_string(), checkpoint)?) - } - /// Save stage checkpoint. fn save_stage_checkpoint( &self, @@ -1592,6 +1583,15 @@ impl StageCheckpointWriter for DatabaseProvider { Ok(self.tx.put::(id.to_string(), checkpoint)?) } + /// Save stage checkpoint progress. + fn save_stage_checkpoint_progress( + &self, + id: StageId, + checkpoint: Vec, + ) -> ProviderResult<()> { + Ok(self.tx.put::(id.to_string(), checkpoint)?) + } + fn update_pipeline_stages( &self, block_number: BlockNumber, @@ -1678,76 +1678,73 @@ impl StorageReader for DatabaseProvider { } impl HashingWriter for DatabaseProvider { - fn insert_hashes( + fn unwind_account_hashing( &self, range: RangeInclusive, - end_block_hash: B256, - expected_state_root: B256, - ) -> ProviderResult<()> { - // Initialize prefix sets. - let mut account_prefix_set = PrefixSetMut::default(); - let mut storage_prefix_set: HashMap = HashMap::default(); - let mut destroyed_accounts = HashSet::default(); + ) -> ProviderResult>> { + let mut hashed_accounts_cursor = self.tx.cursor_write::()?; - let mut durations_recorder = metrics::DurationsRecorder::default(); + // Aggregate all block changesets and make a list of accounts that have been changed. + let hashed_accounts = self + .tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()? + .into_iter() + .rev() + // fold all account to get the old balance/nonces and account that needs to be removed + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap>, (_, account_before)| { + accounts.insert(account_before.address, account_before.info); + accounts + }, + ) + .into_iter() + // hash addresses and collect it inside sorted BTreeMap. + // We are doing keccak only once per address. + .map(|(address, account)| (keccak256(address), account)) + .collect::>(); - // storage hashing stage - { - let lists = self.changed_storages_with_range(range.clone())?; - let storages = self.plain_state_storages(lists)?; - let storage_entries = self.insert_storage_for_hashing(storages)?; - for (hashed_address, hashed_slots) in storage_entries { - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - for slot in hashed_slots { - storage_prefix_set - .entry(hashed_address) - .or_default() - .insert(Nibbles::unpack(slot)); + hashed_accounts + .iter() + // Apply values to HashedState (if Account is None remove it); + .try_for_each(|(hashed_address, account)| -> ProviderResult<()> { + if let Some(account) = account { + hashed_accounts_cursor.upsert(*hashed_address, *account)?; + } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() { + hashed_accounts_cursor.delete_current()?; } + Ok(()) + })?; + + Ok(hashed_accounts) + } + + fn insert_account_for_hashing( + &self, + accounts: impl IntoIterator)>, + ) -> ProviderResult>> { + let mut hashed_accounts_cursor = self.tx.cursor_write::()?; + + let hashed_accounts = accounts.into_iter().fold( + BTreeMap::new(), + |mut map: BTreeMap>, (address, account)| { + map.insert(keccak256(address), account); + map + }, + ); + + hashed_accounts.iter().try_for_each(|(hashed_address, account)| -> ProviderResult<()> { + if let Some(account) = account { + hashed_accounts_cursor.upsert(*hashed_address, *account)? + } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() { + hashed_accounts_cursor.delete_current()?; } - } - durations_recorder.record_relative(metrics::Action::InsertStorageHashing); + Ok(()) + })?; - // account hashing stage - { - let lists = self.changed_accounts_with_range(range.clone())?; - let accounts = self.basic_accounts(lists)?; - let hashed_addresses = self.insert_account_for_hashing(accounts)?; - for (hashed_address, account) in hashed_addresses { - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - if account.is_none() { - destroyed_accounts.insert(hashed_address); - } - } - } - durations_recorder.record_relative(metrics::Action::InsertAccountHashing); - - // merkle tree - { - // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets - // are pre-loaded. - let (state_root, trie_updates) = StateRoot::new(&self.tx) - .with_changed_account_prefixes(account_prefix_set.freeze()) - .with_changed_storage_prefixes( - storage_prefix_set.into_iter().map(|(k, v)| (k, v.freeze())).collect(), - ) - .with_destroyed_accounts(destroyed_accounts) - .root_with_updates() - .map_err(Into::::into)?; - if state_root != expected_state_root { - return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch { - root: GotExpected { got: state_root, expected: expected_state_root }, - block_number: *range.end(), - block_hash: end_block_hash, - }))) - } - trie_updates.flush(&self.tx)?; - } - durations_recorder.record_relative(metrics::Action::InsertMerkleTree); - - debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes"); - - Ok(()) + Ok(hashed_accounts) } fn unwind_storage_hashing( @@ -1848,73 +1845,76 @@ impl HashingWriter for DatabaseProvider { Ok(hashed_storage_keys) } - fn unwind_account_hashing( + fn insert_hashes( &self, range: RangeInclusive, - ) -> ProviderResult>> { - let mut hashed_accounts_cursor = self.tx.cursor_write::()?; + end_block_hash: B256, + expected_state_root: B256, + ) -> ProviderResult<()> { + // Initialize prefix sets. + let mut account_prefix_set = PrefixSetMut::default(); + let mut storage_prefix_set: HashMap = HashMap::default(); + let mut destroyed_accounts = HashSet::default(); - // Aggregate all block changesets and make a list of accounts that have been changed. - let hashed_accounts = self - .tx - .cursor_read::()? - .walk_range(range)? - .collect::, _>>()? - .into_iter() - .rev() - // fold all account to get the old balance/nonces and account that needs to be removed - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap>, (_, account_before)| { - accounts.insert(account_before.address, account_before.info); - accounts - }, - ) - .into_iter() - // hash addresses and collect it inside sorted BTreeMap. - // We are doing keccak only once per address. - .map(|(address, account)| (keccak256(address), account)) - .collect::>(); + let mut durations_recorder = metrics::DurationsRecorder::default(); - hashed_accounts - .iter() - // Apply values to HashedState (if Account is None remove it); - .try_for_each(|(hashed_address, account)| -> ProviderResult<()> { - if let Some(account) = account { - hashed_accounts_cursor.upsert(*hashed_address, *account)?; - } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() { - hashed_accounts_cursor.delete_current()?; + // storage hashing stage + { + let lists = self.changed_storages_with_range(range.clone())?; + let storages = self.plain_state_storages(lists)?; + let storage_entries = self.insert_storage_for_hashing(storages)?; + for (hashed_address, hashed_slots) in storage_entries { + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + for slot in hashed_slots { + storage_prefix_set + .entry(hashed_address) + .or_default() + .insert(Nibbles::unpack(slot)); } - Ok(()) - })?; - - Ok(hashed_accounts) - } - - fn insert_account_for_hashing( - &self, - accounts: impl IntoIterator)>, - ) -> ProviderResult>> { - let mut hashed_accounts_cursor = self.tx.cursor_write::()?; - - let hashed_accounts = accounts.into_iter().fold( - BTreeMap::new(), - |mut map: BTreeMap>, (address, account)| { - map.insert(keccak256(address), account); - map - }, - ); - - hashed_accounts.iter().try_for_each(|(hashed_address, account)| -> ProviderResult<()> { - if let Some(account) = account { - hashed_accounts_cursor.upsert(*hashed_address, *account)? - } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() { - hashed_accounts_cursor.delete_current()?; } - Ok(()) - })?; + } + durations_recorder.record_relative(metrics::Action::InsertStorageHashing); - Ok(hashed_accounts) + // account hashing stage + { + let lists = self.changed_accounts_with_range(range.clone())?; + let accounts = self.basic_accounts(lists)?; + let hashed_addresses = self.insert_account_for_hashing(accounts)?; + for (hashed_address, account) in hashed_addresses { + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + if account.is_none() { + destroyed_accounts.insert(hashed_address); + } + } + } + durations_recorder.record_relative(metrics::Action::InsertAccountHashing); + + // merkle tree + { + // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets + // are pre-loaded. + let (state_root, trie_updates) = StateRoot::new(&self.tx) + .with_changed_account_prefixes(account_prefix_set.freeze()) + .with_changed_storage_prefixes( + storage_prefix_set.into_iter().map(|(k, v)| (k, v.freeze())).collect(), + ) + .with_destroyed_accounts(destroyed_accounts) + .root_with_updates() + .map_err(Into::::into)?; + if state_root != expected_state_root { + return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch { + root: GotExpected { got: state_root, expected: expected_state_root }, + block_number: *range.end(), + block_hash: end_block_hash, + }))) + } + trie_updates.flush(&self.tx)?; + } + durations_recorder.record_relative(metrics::Action::InsertMerkleTree); + + debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes"); + + Ok(()) } }