From 96abde09654c7abe79cbae7c68dd0ad7a9bec90b Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 19 Jun 2023 12:46:47 +0100 Subject: [PATCH] chore: `AccountProvider` -> `AccountReader` & `AccountWriter` (#3228) --- crates/consensus/common/src/validation.rs | 8 +- crates/revm/src/executor.rs | 4 +- crates/rpc/rpc/src/eth/api/state.rs | 2 +- crates/staged-sync/src/utils/init.rs | 4 +- crates/stages/src/stages/execution.rs | 4 +- crates/stages/src/stages/hashing_account.rs | 2 +- .../src/stages/index_account_history.rs | 4 +- crates/storage/provider/src/lib.rs | 4 +- crates/storage/provider/src/post_state/mod.rs | 2 +- .../src/providers/database/provider.rs | 410 ++++++++---------- .../src/providers/post_state_provider.rs | 6 +- .../src/providers/state/historical.rs | 8 +- .../provider/src/providers/state/latest.rs | 6 +- .../provider/src/providers/state/macros.rs | 4 +- .../storage/provider/src/test_utils/mock.rs | 4 +- .../storage/provider/src/test_utils/noop.rs | 4 +- crates/storage/provider/src/traits/account.rs | 48 +- crates/storage/provider/src/traits/mod.rs | 2 +- crates/storage/provider/src/traits/state.rs | 4 +- crates/transaction-pool/src/validate.rs | 2 +- 20 files changed, 259 insertions(+), 273 deletions(-) diff --git a/crates/consensus/common/src/validation.rs b/crates/consensus/common/src/validation.rs index 6eff13b20..f61e160f0 100644 --- a/crates/consensus/common/src/validation.rs +++ b/crates/consensus/common/src/validation.rs @@ -4,7 +4,7 @@ use reth_primitives::{ constants, BlockNumber, ChainSpec, Hardfork, Header, InvalidTransactionError, SealedBlock, SealedHeader, Transaction, TransactionSignedEcRecovered, TxEip1559, TxEip2930, TxLegacy, }; -use reth_provider::{AccountProvider, HeaderProvider, WithdrawalsProvider}; +use reth_provider::{AccountReader, HeaderProvider, WithdrawalsProvider}; use std::{ collections::{hash_map::Entry, HashMap}, time::SystemTime, @@ -120,7 +120,7 @@ pub fn validate_transaction_regarding_header( /// There is no gas check done as [REVM](https://github.com/bluealloy/revm/blob/fd0108381799662098b7ab2c429ea719d6dfbf28/crates/revm/src/evm_impl.rs#L113-L131) already checks that. pub fn validate_all_transaction_regarding_block_and_nonces< 'a, - Provider: HeaderProvider + AccountProvider, + Provider: HeaderProvider + AccountReader, >( transactions: impl Iterator, header: &Header, @@ -363,7 +363,7 @@ pub fn validate_block_regarding_chain( +pub fn full_validation( block: &SealedBlock, provider: Provider, chain_spec: &ChainSpec, @@ -444,7 +444,7 @@ mod tests { } } - impl AccountProvider for Provider { + impl AccountReader for Provider { fn basic_account(&self, _address: Address) -> Result> { Ok(self.account) } diff --git a/crates/revm/src/executor.rs b/crates/revm/src/executor.rs index d9ca09448..8f5fcc3e4 100644 --- a/crates/revm/src/executor.rs +++ b/crates/revm/src/executor.rs @@ -654,7 +654,7 @@ mod tests { }; use reth_provider::{ post_state::{AccountChanges, Storage, StorageTransition, StorageWipe}, - AccountProvider, BlockHashProvider, StateProvider, StateRootProvider, + AccountReader, BlockHashProvider, StateProvider, StateRootProvider, }; use reth_rlp::Decodable; use std::{collections::HashMap, str::FromStr}; @@ -693,7 +693,7 @@ mod tests { } } - impl AccountProvider for StateProviderTest { + impl AccountReader for StateProviderTest { fn basic_account(&self, address: Address) -> reth_interfaces::Result> { let ret = Ok(self.accounts.get(&address).map(|(_, acc)| *acc)); ret diff --git a/crates/rpc/rpc/src/eth/api/state.rs b/crates/rpc/rpc/src/eth/api/state.rs index 9a9669c3f..4785ea384 100644 --- a/crates/rpc/rpc/src/eth/api/state.rs +++ b/crates/rpc/rpc/src/eth/api/state.rs @@ -9,7 +9,7 @@ use reth_primitives::{ U256, }; use reth_provider::{ - AccountProvider, BlockProviderIdExt, EvmEnvProvider, StateProvider, StateProviderFactory, + AccountReader, BlockProviderIdExt, EvmEnvProvider, StateProvider, StateProviderFactory, }; use reth_rpc_types::{EIP1186AccountProofResponse, StorageProof}; use reth_transaction_pool::{PoolTransaction, TransactionPool}; diff --git a/crates/staged-sync/src/utils/init.rs b/crates/staged-sync/src/utils/init.rs index 4f6eebb46..088d9901e 100644 --- a/crates/staged-sync/src/utils/init.rs +++ b/crates/staged-sync/src/utils/init.rs @@ -6,7 +6,9 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, H256, U256}; -use reth_provider::{DatabaseProviderRW, PostState, ProviderFactory, TransactionError}; +use reth_provider::{ + AccountWriter, DatabaseProviderRW, PostState, ProviderFactory, TransactionError, +}; use std::{path::Path, sync::Arc}; use tracing::debug; diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index b62031496..4dae22d19 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -422,9 +422,7 @@ mod tests { hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode, ChainSpecBuilder, SealedBlock, StorageEntry, H160, H256, MAINNET, U256, }; - use reth_provider::{ - insert_canonical_block, AccountProvider, ProviderFactory, ReceiptProvider, - }; + use reth_provider::{insert_canonical_block, AccountReader, ProviderFactory, ReceiptProvider}; use reth_revm::Factory; use reth_rlp::Decodable; use std::sync::Arc; diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 0178a73fa..255a09aa1 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -16,7 +16,7 @@ use reth_primitives::{ StageId, }, }; -use reth_provider::{AccountExtProvider, DatabaseProviderRW}; +use reth_provider::{AccountExtReader, AccountWriter, DatabaseProviderRW}; use std::{ cmp::max, fmt::Debug, diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index df087a48c..30bc39aec 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,7 +1,7 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; use reth_primitives::stage::{StageCheckpoint, StageId}; -use reth_provider::DatabaseProviderRW; +use reth_provider::{AccountExtReader, AccountWriter, DatabaseProviderRW}; use std::fmt::Debug; /// Stage is indexing history the account changesets generated in @@ -46,7 +46,7 @@ impl Stage for IndexAccountHistoryStage { let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); - let indices = provider.get_account_block_numbers_from_changesets(range.clone())?; + let indices = provider.changed_accounts_and_blocks_with_range(range.clone())?; // Insert changeset to history index provider.insert_account_history_index(indices)?; diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 00b4f022a..c290ed61e 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -11,8 +11,8 @@ /// Various provider traits. mod traits; pub use traits::{ - AccountExtProvider, AccountProvider, BlockExecutor, BlockHashProvider, BlockIdProvider, - BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource, + AccountExtReader, AccountReader, AccountWriter, BlockExecutor, BlockHashProvider, + BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, ExecutorFactory, HeaderProvider, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt, diff --git a/crates/storage/provider/src/post_state/mod.rs b/crates/storage/provider/src/post_state/mod.rs index f37d16128..e9eb216f1 100644 --- a/crates/storage/provider/src/post_state/mod.rs +++ b/crates/storage/provider/src/post_state/mod.rs @@ -640,7 +640,7 @@ impl PostState { #[cfg(test)] mod tests { use super::*; - use crate::{AccountProvider, ProviderFactory}; + use crate::{AccountReader, ProviderFactory}; use reth_db::{ database::Database, mdbx::{test_utils, Env, EnvKind, WriteMap}, diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index d4d3f332a..e92f079df 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1,10 +1,10 @@ use crate::{ insert_canonical_block, post_state::StorageChangeset, - traits::{AccountExtProvider, BlockSource, ReceiptProvider, StageCheckpointWriter}, - AccountProvider, BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider, - HeaderProvider, PostState, ProviderError, StageCheckpointReader, TransactionError, - TransactionsProvider, WithdrawalsProvider, + traits::{AccountExtReader, BlockSource, ReceiptProvider, StageCheckpointWriter}, + AccountReader, AccountWriter, BlockHashProvider, BlockNumProvider, BlockProvider, + EvmEnvProvider, HeaderProvider, PostState, ProviderError, StageCheckpointReader, + TransactionError, TransactionsProvider, WithdrawalsProvider, }; use itertools::{izip, Itertools}; use reth_db::{ @@ -109,7 +109,7 @@ fn unwind_account_history_shards<'a, TX: reth_db::transaction::DbTxMutGAT<'a>>( cursor: &mut >::CursorMut, address: Address, block_number: BlockNumber, -) -> std::result::Result, TransactionError> { +) -> Result> { let mut item = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; while let Some((sharded_key, list)) = item { @@ -276,56 +276,6 @@ impl<'this, TX: DbTx<'this>> DatabaseProvider<'this, TX> { Ok(storage_changeset_lists) } - - /// Get all block numbers where account got changed. - /// - /// NOTE: Get inclusive range of blocks. - pub fn get_account_block_numbers_from_changesets( - &self, - range: RangeInclusive, - ) -> std::result::Result>, TransactionError> { - let mut changeset_cursor = self.tx.cursor_read::()?; - - let account_transtions = changeset_cursor.walk_range(range)?.try_fold( - BTreeMap::new(), - |mut accounts: BTreeMap>, - entry| - -> std::result::Result<_, TransactionError> { - let (index, account) = entry?; - accounts.entry(account.address).or_default().push(index); - Ok(accounts) - }, - )?; - - Ok(account_transtions) - } - - /// Iterate over account changesets and return all account address that were changed. - pub fn get_addresses_of_changed_accounts( - &self, - range: RangeInclusive, - ) -> std::result::Result, TransactionError> { - self.tx.cursor_read::()?.walk_range(range)?.try_fold( - BTreeSet::new(), - |mut accounts: BTreeSet
, entry| { - let (_, account_before) = entry?; - accounts.insert(account_before.address); - Ok(accounts) - }, - ) - } - - /// Get plainstate account from iterator - pub fn get_plainstate_accounts( - &self, - iter: impl IntoIterator, - ) -> std::result::Result)>, TransactionError> { - let mut plain_accounts = self.tx.cursor_read::()?; - Ok(iter - .into_iter() - .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))) - .collect::, _>>()?) - } } impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { @@ -354,49 +304,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { self.get_take_block_and_execution_range::(chain_spec, range) } - /// Unwind and clear account hashing - pub fn unwind_account_hashing( - &self, - range: RangeInclusive, - ) -> std::result::Result<(), TransactionError> { - let mut hashed_accounts = self.tx.cursor_write::()?; - - // Aggregate all block changesets and make a list of accounts that have been changed. - self.tx - .cursor_read::()? - .walk_range(range)? - .collect::, _>>()? - .into_iter() - .rev() - // fold all account to get the old balance/nonces and account that needs to be removed - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap>, (_, account_before)| { - accounts.insert(account_before.address, account_before.info); - accounts - }, - ) - .into_iter() - // hash addresses and collect it inside sorted BTreeMap. - // We are doing keccak only once per address. - .map(|(address, account)| (keccak256(address), account)) - .collect::>() - .into_iter() - // Apply values to HashedState (if Account is None remove it); - .try_for_each( - |(hashed_address, account)| -> std::result::Result<(), TransactionError> { - if let Some(account) = account { - hashed_accounts.upsert(hashed_address, account)?; - } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { - hashed_accounts.delete_current()?; - } - Ok(()) - }, - )?; - - Ok(()) - } - /// Unwind and clear storage hashing pub fn unwind_storage_hashing( &self, @@ -447,49 +354,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(()) } - /// Unwind and clear account history indices. - /// - /// Returns number of changesets walked. - pub fn unwind_account_history_indices( - &self, - range: RangeInclusive, - ) -> std::result::Result { - let account_changeset = self - .tx - .cursor_read::()? - .walk_range(range)? - .collect::, _>>()?; - let changesets = account_changeset.len(); - - let last_indices = account_changeset - .into_iter() - // reverse so we can get lowest block number where we need to unwind account. - .rev() - // fold all account and get last block number - .fold(BTreeMap::new(), |mut accounts: BTreeMap, (index, account)| { - // we just need address and lowest block number. - accounts.insert(account.address, index); - accounts - }); - // try to unwind the index - let mut cursor = self.tx.cursor_write::()?; - for (address, rem_index) in last_indices { - let shard_part = unwind_account_history_shards::(&mut cursor, address, rem_index)?; - - // check last shard_part, if present, items needs to be reinserted. - if !shard_part.is_empty() { - // there are items in list - self.tx.put::( - ShardedKey::new(address, u64::MAX), - BlockNumberList::new(shard_part) - .expect("There is at least one element in list and it is sorted."), - )?; - } - } - - Ok(changesets) - } - /// Unwind and clear storage history indices. /// /// Returns number of changesets walked. @@ -1034,44 +898,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(()) } - /// Insert account change index to database. Used inside AccountHistoryIndex stage - pub fn insert_account_history_index( - &self, - account_transitions: BTreeMap>, - ) -> std::result::Result<(), TransactionError> { - // insert indexes to AccountHistory. - for (address, mut indices) in account_transitions { - let mut last_shard = self.take_last_account_shard(address)?; - last_shard.append(&mut indices); - // chunk indices and insert them in shards of N size. - let mut chunks = last_shard - .iter() - .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) - .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); - let last_chunk = chunks.pop(); - - chunks.into_iter().try_for_each(|list| { - self.tx.put::( - ShardedKey::new( - address, - *list.last().expect("Chuck does not return empty list") as BlockNumber, - ), - BlockNumberList::new(list).expect("Indices are presorted and not empty"), - ) - })?; - // Insert last list with u64::MAX - if let Some(last_list) = last_chunk { - self.tx.put::( - ShardedKey::new(address, u64::MAX), - BlockNumberList::new(last_list).expect("Indices are presorted and not empty"), - )? - } - } - Ok(()) - } - /// Query the block body by number. pub fn block_body_indices( &self, @@ -1141,23 +967,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(()) } - /// Load last shard and check if it is full and remove if it is not. If list is empty, last - /// shard was full or there is no shards at all. - fn take_last_account_shard( - &self, - address: Address, - ) -> std::result::Result, TransactionError> { - let mut cursor = self.tx.cursor_read::()?; - let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; - if let Some((shard_key, list)) = last { - // delete old shard so new one can be inserted. - self.tx.delete::(shard_key, None)?; - let list = list.iter(0).map(|i| i as u64).collect::>(); - return Ok(list) - } - Ok(Vec::new()) - } - /// Load last shard and check if it is full and remove if it is not. If list is empty, last /// shard was full or there is no shards at all. pub fn take_last_storage_shard( @@ -1214,34 +1023,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(()) } - /// iterate over accounts and insert them to hashing table - pub fn insert_account_for_hashing( - &self, - accounts: impl IntoIterator)>, - ) -> std::result::Result<(), TransactionError> { - let mut hashed_accounts = self.tx.cursor_write::()?; - - let hashes_accounts = accounts.into_iter().fold( - BTreeMap::new(), - |mut map: BTreeMap>, (address, account)| { - map.insert(keccak256(address), account); - map - }, - ); - - hashes_accounts.into_iter().try_for_each( - |(hashed_address, account)| -> std::result::Result<(), TransactionError> { - if let Some(account) = account { - hashed_accounts.upsert(hashed_address, account)? - } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { - hashed_accounts.delete_current()?; - } - Ok(()) - }, - )?; - Ok(()) - } - /// Append blocks and insert its post state. /// This will insert block data to all related tables and will update pipeline progress. pub fn append_blocks_with_post_state( @@ -1299,7 +1080,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { ) -> std::result::Result<(), TransactionError> { // account history stage { - let indices = self.get_account_block_numbers_from_changesets(range.clone())?; + let indices = self.changed_accounts_and_blocks_with_range(range.clone())?; self.insert_account_history_index(indices)?; } @@ -1333,8 +1114,8 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { // account hashing stage { - let lists = self.get_addresses_of_changed_accounts(range.clone())?; - let accounts = self.get_plainstate_accounts(lists.into_iter())?; + let lists = self.changed_accounts_with_range(range.clone())?; + let accounts = self.basic_accounts(lists.into_iter())?; self.insert_account_for_hashing(accounts.into_iter())?; } @@ -1356,13 +1137,13 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { } } -impl<'this, TX: DbTx<'this>> AccountProvider for DatabaseProvider<'this, TX> { +impl<'this, TX: DbTx<'this>> AccountReader for DatabaseProvider<'this, TX> { fn basic_account(&self, address: Address) -> Result> { Ok(self.tx.get::(address)?) } } -impl<'this, TX: DbTx<'this>> AccountExtProvider for DatabaseProvider<'this, TX> { +impl<'this, TX: DbTx<'this>> AccountExtReader for DatabaseProvider<'this, TX> { fn changed_accounts_with_range( &self, range: impl RangeBounds, @@ -1386,6 +1167,177 @@ impl<'this, TX: DbTx<'this>> AccountExtProvider for DatabaseProvider<'this, TX> .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))) .collect::, _>>()?) } + + fn changed_accounts_and_blocks_with_range( + &self, + range: RangeInclusive, + ) -> Result>> { + let mut changeset_cursor = self.tx.cursor_read::()?; + + let account_transitions = changeset_cursor.walk_range(range)?.try_fold( + BTreeMap::new(), + |mut accounts: BTreeMap>, entry| -> Result<_> { + let (index, account) = entry?; + accounts.entry(account.address).or_default().push(index); + Ok(accounts) + }, + )?; + + Ok(account_transitions) + } +} + +impl<'this, TX: DbTxMut<'this> + DbTx<'this>> AccountWriter for DatabaseProvider<'this, TX> { + fn unwind_account_hashing(&self, range: RangeInclusive) -> Result<()> { + let mut hashed_accounts = self.tx.cursor_write::()?; + + // Aggregate all block changesets and make a list of accounts that have been changed. + self.tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()? + .into_iter() + .rev() + // fold all account to get the old balance/nonces and account that needs to be removed + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap>, (_, account_before)| { + accounts.insert(account_before.address, account_before.info); + accounts + }, + ) + .into_iter() + // hash addresses and collect it inside sorted BTreeMap. + // We are doing keccak only once per address. + .map(|(address, account)| (keccak256(address), account)) + .collect::>() + .into_iter() + // Apply values to HashedState (if Account is None remove it); + .try_for_each(|(hashed_address, account)| -> Result<()> { + if let Some(account) = account { + hashed_accounts.upsert(hashed_address, account)?; + } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { + hashed_accounts.delete_current()?; + } + Ok(()) + })?; + + Ok(()) + } + + fn unwind_account_history_indices(&self, range: RangeInclusive) -> Result { + let account_changeset = self + .tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()?; + let changesets = account_changeset.len(); + + let last_indices = account_changeset + .into_iter() + // reverse so we can get lowest block number where we need to unwind account. + .rev() + // fold all account and get last block number + .fold(BTreeMap::new(), |mut accounts: BTreeMap, (index, account)| { + // we just need address and lowest block number. + accounts.insert(account.address, index); + accounts + }); + // try to unwind the index + let mut cursor = self.tx.cursor_write::()?; + for (address, rem_index) in last_indices { + let shard_part = unwind_account_history_shards::(&mut cursor, address, rem_index)?; + + // check last shard_part, if present, items needs to be reinserted. + if !shard_part.is_empty() { + // there are items in list + self.tx.put::( + ShardedKey::new(address, u64::MAX), + BlockNumberList::new(shard_part) + .expect("There is at least one element in list and it is sorted."), + )?; + } + } + + Ok(changesets) + } + + fn insert_account_history_index( + &self, + account_transitions: BTreeMap>, + ) -> Result<()> { + // insert indexes to AccountHistory. + for (address, mut indices) in account_transitions { + // Load last shard and check if it is full and remove if it is not. If list is empty, + // last shard was full or there is no shards at all. + let mut last_shard = { + let mut cursor = self.tx.cursor_read::()?; + let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; + if let Some((shard_key, list)) = last { + // delete old shard so new one can be inserted. + self.tx.delete::(shard_key, None)?; + let list = list.iter(0).map(|i| i as u64).collect::>(); + list + } else { + Vec::new() + } + }; + + last_shard.append(&mut indices); + // chunk indices and insert them in shards of N size. + let mut chunks = last_shard + .iter() + .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) + .into_iter() + .map(|chunks| chunks.map(|i| *i as usize).collect::>()) + .collect::>(); + let last_chunk = chunks.pop(); + + chunks.into_iter().try_for_each(|list| { + self.tx.put::( + ShardedKey::new( + address, + *list.last().expect("Chuck does not return empty list") as BlockNumber, + ), + BlockNumberList::new(list).expect("Indices are presorted and not empty"), + ) + })?; + + // Insert last list with u64::MAX + if let Some(last_list) = last_chunk { + self.tx.put::( + ShardedKey::new(address, u64::MAX), + BlockNumberList::new(last_list).expect("Indices are presorted and not empty"), + )? + } + } + Ok(()) + } + + fn insert_account_for_hashing( + &self, + accounts: impl IntoIterator)>, + ) -> Result<()> { + let mut hashed_accounts = self.tx.cursor_write::()?; + + let hashes_accounts = accounts.into_iter().fold( + BTreeMap::new(), + |mut map: BTreeMap>, (address, account)| { + map.insert(keccak256(address), account); + map + }, + ); + + hashes_accounts.into_iter().try_for_each(|(hashed_address, account)| -> Result<()> { + if let Some(account) = account { + hashed_accounts.upsert(hashed_address, account)? + } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { + hashed_accounts.delete_current()?; + } + Ok(()) + })?; + Ok(()) + } } impl<'this, TX: DbTx<'this>> HeaderProvider for DatabaseProvider<'this, TX> { diff --git a/crates/storage/provider/src/providers/post_state_provider.rs b/crates/storage/provider/src/providers/post_state_provider.rs index b2c6e5e99..065e5a1ef 100644 --- a/crates/storage/provider/src/providers/post_state_provider.rs +++ b/crates/storage/provider/src/providers/post_state_provider.rs @@ -1,5 +1,5 @@ use crate::{ - AccountProvider, BlockHashProvider, PostState, PostStateDataProvider, StateProvider, + AccountReader, BlockHashProvider, PostState, PostStateDataProvider, StateProvider, StateRootProvider, }; use reth_interfaces::{provider::ProviderError, Result}; @@ -39,9 +39,7 @@ impl BlockHashProvider } } -impl AccountProvider - for PostStateProvider -{ +impl AccountReader for PostStateProvider { fn basic_account(&self, address: Address) -> Result> { if let Some(account) = self.post_state_data_provider.state().account(&address) { Ok(*account) diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index 5bf571c99..686dd0469 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -1,6 +1,6 @@ use crate::{ - providers::state::macros::delegate_provider_impls, AccountProvider, BlockHashProvider, - PostState, ProviderError, StateProvider, StateRootProvider, + providers::state::macros::delegate_provider_impls, AccountReader, BlockHashProvider, PostState, + ProviderError, StateProvider, StateRootProvider, }; use reth_db::{ cursor::{DbCursorRO, DbDupCursorRO}, @@ -102,7 +102,7 @@ impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> { } } -impl<'a, 'b, TX: DbTx<'a>> AccountProvider for HistoricalStateProviderRef<'a, 'b, TX> { +impl<'a, 'b, TX: DbTx<'a>> AccountReader for HistoricalStateProviderRef<'a, 'b, TX> { /// Get basic account information. fn basic_account(&self, address: Address) -> Result> { match self.account_history_lookup(address)? { @@ -219,7 +219,7 @@ delegate_provider_impls!(HistoricalStateProvider<'a, TX> where [TX: DbTx<'a>]); #[cfg(test)] mod tests { use crate::{ - AccountProvider, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider, + AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider, }; use reth_db::{ database::Database, diff --git a/crates/storage/provider/src/providers/state/latest.rs b/crates/storage/provider/src/providers/state/latest.rs index 63d264c46..4d53351cf 100644 --- a/crates/storage/provider/src/providers/state/latest.rs +++ b/crates/storage/provider/src/providers/state/latest.rs @@ -1,6 +1,6 @@ use crate::{ - providers::state::macros::delegate_provider_impls, AccountProvider, BlockHashProvider, - PostState, StateProvider, StateRootProvider, + providers::state::macros::delegate_provider_impls, AccountReader, BlockHashProvider, PostState, + StateProvider, StateRootProvider, }; use reth_db::{ cursor::{DbCursorRO, DbDupCursorRO}, @@ -28,7 +28,7 @@ impl<'a, 'b, TX: DbTx<'a>> LatestStateProviderRef<'a, 'b, TX> { } } -impl<'a, 'b, TX: DbTx<'a>> AccountProvider for LatestStateProviderRef<'a, 'b, TX> { +impl<'a, 'b, TX: DbTx<'a>> AccountReader for LatestStateProviderRef<'a, 'b, TX> { /// Get basic account information. fn basic_account(&self, address: Address) -> Result> { self.db.get::(address).map_err(Into::into) diff --git a/crates/storage/provider/src/providers/state/macros.rs b/crates/storage/provider/src/providers/state/macros.rs index 83c9c756d..299032e99 100644 --- a/crates/storage/provider/src/providers/state/macros.rs +++ b/crates/storage/provider/src/providers/state/macros.rs @@ -23,7 +23,7 @@ pub(crate) use delegate_impls_to_as_ref; /// Delegates the provider trait implementations to the `as_ref` function of the type: /// -/// [AccountProvider](crate::AccountProvider) +/// [AccountReader](crate::AccountReader) /// [BlockHashProvider](crate::BlockHashProvider) /// [StateProvider](crate::StateProvider) macro_rules! delegate_provider_impls { @@ -33,7 +33,7 @@ macro_rules! delegate_provider_impls { StateRootProvider $(where [$($generics)*])? { fn state_root(&self, state: crate::PostState) -> reth_interfaces::Result; } - AccountProvider $(where [$($generics)*])? { + AccountReader $(where [$($generics)*])? { fn basic_account(&self, address: reth_primitives::Address) -> reth_interfaces::Result>; } BlockHashProvider $(where [$($generics)*])? { diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 49cda80e9..4e09510e7 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -1,6 +1,6 @@ use crate::{ traits::{BlockSource, ReceiptProvider}, - AccountProvider, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, + AccountReader, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, PostStateDataProvider, StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider, @@ -362,7 +362,7 @@ impl BlockProviderIdExt for MockEthProvider { } } -impl AccountProvider for MockEthProvider { +impl AccountReader for MockEthProvider { fn basic_account(&self, address: Address) -> Result> { Ok(self.accounts.lock().get(&address).cloned().map(|a| a.account)) } diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 9a72152cb..2edbde9ca 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -1,6 +1,6 @@ use crate::{ traits::{BlockSource, ReceiptProvider}, - AccountProvider, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, + AccountReader, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider, @@ -212,7 +212,7 @@ impl HeaderProvider for NoopProvider { } } -impl AccountProvider for NoopProvider { +impl AccountReader for NoopProvider { fn basic_account(&self, _address: Address) -> Result> { Ok(None) } diff --git a/crates/storage/provider/src/traits/account.rs b/crates/storage/provider/src/traits/account.rs index 6a48104f5..ceafaec24 100644 --- a/crates/storage/provider/src/traits/account.rs +++ b/crates/storage/provider/src/traits/account.rs @@ -1,20 +1,23 @@ use auto_impl::auto_impl; use reth_interfaces::Result; use reth_primitives::{Account, Address, BlockNumber}; -use std::{collections::BTreeSet, ops::RangeBounds}; +use std::{ + collections::{BTreeMap, BTreeSet}, + ops::{RangeBounds, RangeInclusive}, +}; -/// Account provider +/// Account reader #[auto_impl(&, Arc, Box)] -pub trait AccountProvider: Send + Sync { +pub trait AccountReader: Send + Sync { /// Get basic account information. /// /// Returns `None` if the account doesn't exist. fn basic_account(&self, address: Address) -> Result>; } -/// Account provider +/// Account reader #[auto_impl(&, Arc, Box)] -pub trait AccountExtProvider: Send + Sync { +pub trait AccountExtReader: Send + Sync { /// Iterate over account changesets and return all account address that were changed. fn changed_accounts_with_range( &self, @@ -22,11 +25,44 @@ pub trait AccountExtProvider: Send + Sync { ) -> Result>; /// Get basic account information for multiple accounts. A more efficient version than calling - /// [`AccountProvider::basic_account`] repeatedly. + /// [`AccountReader::basic_account`] repeatedly. /// /// Returns `None` if the account doesn't exist. fn basic_accounts( &self, _iter: impl IntoIterator, ) -> Result)>>; + + /// Iterate over account changesets and return all account addresses that were changed alongside + /// each specific set of blocks. + /// + /// NOTE: Get inclusive range of blocks. + fn changed_accounts_and_blocks_with_range( + &self, + range: RangeInclusive, + ) -> Result>>; +} + +/// Account reader +#[auto_impl(&, Arc, Box)] +pub trait AccountWriter: Send + Sync { + /// Unwind and clear account hashing + fn unwind_account_hashing(&self, range: RangeInclusive) -> Result<()>; + + /// Unwind and clear account history indices. + /// + /// Returns number of changesets walked. + fn unwind_account_history_indices(&self, range: RangeInclusive) -> Result; + + /// Insert account change index to database. Used inside AccountHistoryIndex stage + fn insert_account_history_index( + &self, + account_transitions: BTreeMap>, + ) -> Result<()>; + + /// iterate over accounts and insert them to hashing table + fn insert_account_for_hashing( + &self, + accounts: impl IntoIterator)>, + ) -> Result<()>; } diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index d6564a87f..07b918a01 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -1,7 +1,7 @@ //! Collection of common provider traits. mod account; -pub use account::{AccountExtProvider, AccountProvider}; +pub use account::{AccountExtReader, AccountReader, AccountWriter}; mod block; pub use block::{BlockProvider, BlockProviderIdExt, BlockSource}; diff --git a/crates/storage/provider/src/traits/state.rs b/crates/storage/provider/src/traits/state.rs index bbd13ec3d..5e9dca995 100644 --- a/crates/storage/provider/src/traits/state.rs +++ b/crates/storage/provider/src/traits/state.rs @@ -1,4 +1,4 @@ -use super::AccountProvider; +use super::AccountReader; use crate::{post_state::PostState, BlockHashProvider, BlockIdProvider}; use auto_impl::auto_impl; use reth_interfaces::{provider::ProviderError, Result}; @@ -13,7 +13,7 @@ pub type StateProviderBox<'a> = Box; /// An abstraction for a type that provides state data. #[auto_impl(&, Arc, Box)] pub trait StateProvider: - BlockHashProvider + AccountProvider + StateRootProvider + Send + Sync + BlockHashProvider + AccountReader + StateRootProvider + Send + Sync { /// Get storage of given account. fn storage(&self, account: Address, storage_key: StorageKey) -> Result>; diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index f8705b324..d2343caaf 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -11,7 +11,7 @@ use reth_primitives::{ TransactionSignedEcRecovered, TxHash, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID, LEGACY_TX_TYPE_ID, U256, }; -use reth_provider::{AccountProvider, StateProviderFactory}; +use reth_provider::{AccountReader, StateProviderFactory}; use std::{fmt, marker::PhantomData, sync::Arc, time::Instant}; /// A Result type returned after checking a transaction's validity.