chore: AccountProvider -> AccountReader & AccountWriter (#3228)

This commit is contained in:
joshieDo
2023-06-19 12:46:47 +01:00
committed by GitHub
parent 13dcfb8e6e
commit 96abde0965
20 changed files with 259 additions and 273 deletions

View File

@ -4,7 +4,7 @@ use reth_primitives::{
constants, BlockNumber, ChainSpec, Hardfork, Header, InvalidTransactionError, SealedBlock, constants, BlockNumber, ChainSpec, Hardfork, Header, InvalidTransactionError, SealedBlock,
SealedHeader, Transaction, TransactionSignedEcRecovered, TxEip1559, TxEip2930, TxLegacy, SealedHeader, Transaction, TransactionSignedEcRecovered, TxEip1559, TxEip2930, TxLegacy,
}; };
use reth_provider::{AccountProvider, HeaderProvider, WithdrawalsProvider}; use reth_provider::{AccountReader, HeaderProvider, WithdrawalsProvider};
use std::{ use std::{
collections::{hash_map::Entry, HashMap}, collections::{hash_map::Entry, HashMap},
time::SystemTime, time::SystemTime,
@ -120,7 +120,7 @@ pub fn validate_transaction_regarding_header(
/// There is no gas check done as [REVM](https://github.com/bluealloy/revm/blob/fd0108381799662098b7ab2c429ea719d6dfbf28/crates/revm/src/evm_impl.rs#L113-L131) already checks that. /// There is no gas check done as [REVM](https://github.com/bluealloy/revm/blob/fd0108381799662098b7ab2c429ea719d6dfbf28/crates/revm/src/evm_impl.rs#L113-L131) already checks that.
pub fn validate_all_transaction_regarding_block_and_nonces< pub fn validate_all_transaction_regarding_block_and_nonces<
'a, 'a,
Provider: HeaderProvider + AccountProvider, Provider: HeaderProvider + AccountReader,
>( >(
transactions: impl Iterator<Item = &'a TransactionSignedEcRecovered>, transactions: impl Iterator<Item = &'a TransactionSignedEcRecovered>,
header: &Header, header: &Header,
@ -363,7 +363,7 @@ pub fn validate_block_regarding_chain<PROV: HeaderProvider + WithdrawalsProvider
} }
/// Full validation of block before execution. /// Full validation of block before execution.
pub fn full_validation<Provider: HeaderProvider + AccountProvider + WithdrawalsProvider>( pub fn full_validation<Provider: HeaderProvider + AccountReader + WithdrawalsProvider>(
block: &SealedBlock, block: &SealedBlock,
provider: Provider, provider: Provider,
chain_spec: &ChainSpec, chain_spec: &ChainSpec,
@ -444,7 +444,7 @@ mod tests {
} }
} }
impl AccountProvider for Provider { impl AccountReader for Provider {
fn basic_account(&self, _address: Address) -> Result<Option<Account>> { fn basic_account(&self, _address: Address) -> Result<Option<Account>> {
Ok(self.account) Ok(self.account)
} }

View File

@ -654,7 +654,7 @@ mod tests {
}; };
use reth_provider::{ use reth_provider::{
post_state::{AccountChanges, Storage, StorageTransition, StorageWipe}, post_state::{AccountChanges, Storage, StorageTransition, StorageWipe},
AccountProvider, BlockHashProvider, StateProvider, StateRootProvider, AccountReader, BlockHashProvider, StateProvider, StateRootProvider,
}; };
use reth_rlp::Decodable; use reth_rlp::Decodable;
use std::{collections::HashMap, str::FromStr}; use std::{collections::HashMap, str::FromStr};
@ -693,7 +693,7 @@ mod tests {
} }
} }
impl AccountProvider for StateProviderTest { impl AccountReader for StateProviderTest {
fn basic_account(&self, address: Address) -> reth_interfaces::Result<Option<Account>> { fn basic_account(&self, address: Address) -> reth_interfaces::Result<Option<Account>> {
let ret = Ok(self.accounts.get(&address).map(|(_, acc)| *acc)); let ret = Ok(self.accounts.get(&address).map(|(_, acc)| *acc));
ret ret

View File

@ -9,7 +9,7 @@ use reth_primitives::{
U256, U256,
}; };
use reth_provider::{ use reth_provider::{
AccountProvider, BlockProviderIdExt, EvmEnvProvider, StateProvider, StateProviderFactory, AccountReader, BlockProviderIdExt, EvmEnvProvider, StateProvider, StateProviderFactory,
}; };
use reth_rpc_types::{EIP1186AccountProofResponse, StorageProof}; use reth_rpc_types::{EIP1186AccountProofResponse, StorageProof};
use reth_transaction_pool::{PoolTransaction, TransactionPool}; use reth_transaction_pool::{PoolTransaction, TransactionPool};

View File

@ -6,7 +6,9 @@ use reth_db::{
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
}; };
use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, H256, U256}; use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, H256, U256};
use reth_provider::{DatabaseProviderRW, PostState, ProviderFactory, TransactionError}; use reth_provider::{
AccountWriter, DatabaseProviderRW, PostState, ProviderFactory, TransactionError,
};
use std::{path::Path, sync::Arc}; use std::{path::Path, sync::Arc};
use tracing::debug; use tracing::debug;

View File

@ -422,9 +422,7 @@ mod tests {
hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode, hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode,
ChainSpecBuilder, SealedBlock, StorageEntry, H160, H256, MAINNET, U256, ChainSpecBuilder, SealedBlock, StorageEntry, H160, H256, MAINNET, U256,
}; };
use reth_provider::{ use reth_provider::{insert_canonical_block, AccountReader, ProviderFactory, ReceiptProvider};
insert_canonical_block, AccountProvider, ProviderFactory, ReceiptProvider,
};
use reth_revm::Factory; use reth_revm::Factory;
use reth_rlp::Decodable; use reth_rlp::Decodable;
use std::sync::Arc; use std::sync::Arc;

View File

@ -16,7 +16,7 @@ use reth_primitives::{
StageId, StageId,
}, },
}; };
use reth_provider::{AccountExtProvider, DatabaseProviderRW}; use reth_provider::{AccountExtReader, AccountWriter, DatabaseProviderRW};
use std::{ use std::{
cmp::max, cmp::max,
fmt::Debug, fmt::Debug,

View File

@ -1,7 +1,7 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database; use reth_db::database::Database;
use reth_primitives::stage::{StageCheckpoint, StageId}; use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_provider::DatabaseProviderRW; use reth_provider::{AccountExtReader, AccountWriter, DatabaseProviderRW};
use std::fmt::Debug; use std::fmt::Debug;
/// Stage is indexing history the account changesets generated in /// Stage is indexing history the account changesets generated in
@ -46,7 +46,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let indices = provider.get_account_block_numbers_from_changesets(range.clone())?; let indices = provider.changed_accounts_and_blocks_with_range(range.clone())?;
// Insert changeset to history index // Insert changeset to history index
provider.insert_account_history_index(indices)?; provider.insert_account_history_index(indices)?;

View File

@ -11,8 +11,8 @@
/// Various provider traits. /// Various provider traits.
mod traits; mod traits;
pub use traits::{ pub use traits::{
AccountExtProvider, AccountProvider, BlockExecutor, BlockHashProvider, BlockIdProvider, AccountExtReader, AccountReader, AccountWriter, BlockExecutor, BlockHashProvider,
BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource, BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource,
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider,
ExecutorFactory, HeaderProvider, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt, ExecutorFactory, HeaderProvider, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt,

View File

@ -640,7 +640,7 @@ impl PostState {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{AccountProvider, ProviderFactory}; use crate::{AccountReader, ProviderFactory};
use reth_db::{ use reth_db::{
database::Database, database::Database,
mdbx::{test_utils, Env, EnvKind, WriteMap}, mdbx::{test_utils, Env, EnvKind, WriteMap},

View File

@ -1,10 +1,10 @@
use crate::{ use crate::{
insert_canonical_block, insert_canonical_block,
post_state::StorageChangeset, post_state::StorageChangeset,
traits::{AccountExtProvider, BlockSource, ReceiptProvider, StageCheckpointWriter}, traits::{AccountExtReader, BlockSource, ReceiptProvider, StageCheckpointWriter},
AccountProvider, BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider, AccountReader, AccountWriter, BlockHashProvider, BlockNumProvider, BlockProvider,
HeaderProvider, PostState, ProviderError, StageCheckpointReader, TransactionError, EvmEnvProvider, HeaderProvider, PostState, ProviderError, StageCheckpointReader,
TransactionsProvider, WithdrawalsProvider, TransactionError, TransactionsProvider, WithdrawalsProvider,
}; };
use itertools::{izip, Itertools}; use itertools::{izip, Itertools};
use reth_db::{ use reth_db::{
@ -109,7 +109,7 @@ fn unwind_account_history_shards<'a, TX: reth_db::transaction::DbTxMutGAT<'a>>(
cursor: &mut <TX as DbTxMutGAT<'a>>::CursorMut<tables::AccountHistory>, cursor: &mut <TX as DbTxMutGAT<'a>>::CursorMut<tables::AccountHistory>,
address: Address, address: Address,
block_number: BlockNumber, block_number: BlockNumber,
) -> std::result::Result<Vec<usize>, TransactionError> { ) -> Result<Vec<usize>> {
let mut item = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; let mut item = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
while let Some((sharded_key, list)) = item { while let Some((sharded_key, list)) = item {
@ -276,56 +276,6 @@ impl<'this, TX: DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(storage_changeset_lists) Ok(storage_changeset_lists)
} }
/// Get all block numbers where account got changed.
///
/// NOTE: Get inclusive range of blocks.
pub fn get_account_block_numbers_from_changesets(
&self,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<BTreeMap<Address, Vec<u64>>, TransactionError> {
let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSet>()?;
let account_transtions = changeset_cursor.walk_range(range)?.try_fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Vec<u64>>,
entry|
-> std::result::Result<_, TransactionError> {
let (index, account) = entry?;
accounts.entry(account.address).or_default().push(index);
Ok(accounts)
},
)?;
Ok(account_transtions)
}
/// Iterate over account changesets and return all account address that were changed.
pub fn get_addresses_of_changed_accounts(
&self,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<BTreeSet<Address>, TransactionError> {
self.tx.cursor_read::<tables::AccountChangeSet>()?.walk_range(range)?.try_fold(
BTreeSet::new(),
|mut accounts: BTreeSet<Address>, entry| {
let (_, account_before) = entry?;
accounts.insert(account_before.address);
Ok(accounts)
},
)
}
/// Get plainstate account from iterator
pub fn get_plainstate_accounts(
&self,
iter: impl IntoIterator<Item = Address>,
) -> std::result::Result<Vec<(Address, Option<Account>)>, TransactionError> {
let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
Ok(iter
.into_iter()
.map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
.collect::<std::result::Result<Vec<_>, _>>()?)
}
} }
impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
@ -354,49 +304,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
self.get_take_block_and_execution_range::<true>(chain_spec, range) self.get_take_block_and_execution_range::<true>(chain_spec, range)
} }
/// Unwind and clear account hashing
pub fn unwind_account_hashing(
&self,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<(), TransactionError> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
// Aggregate all block changesets and make a list of accounts that have been changed.
self.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Option<Account>>, (_, account_before)| {
accounts.insert(account_before.address, account_before.info);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|(address, account)| (keccak256(address), account))
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedState (if Account is None remove it);
.try_for_each(
|(hashed_address, account)| -> std::result::Result<(), TransactionError> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?;
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
},
)?;
Ok(())
}
/// Unwind and clear storage hashing /// Unwind and clear storage hashing
pub fn unwind_storage_hashing( pub fn unwind_storage_hashing(
&self, &self,
@ -447,49 +354,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(()) Ok(())
} }
/// Unwind and clear account history indices.
///
/// Returns number of changesets walked.
pub fn unwind_account_history_indices(
&self,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<usize, TransactionError> {
let account_changeset = self
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
let changesets = account_changeset.len();
let last_indices = account_changeset
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all account and get last block number
.fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, u64>, (index, account)| {
// we just need address and lowest block number.
accounts.insert(account.address, index);
accounts
});
// try to unwind the index
let mut cursor = self.tx.cursor_write::<tables::AccountHistory>()?;
for (address, rem_index) in last_indices {
let shard_part = unwind_account_history_shards::<TX>(&mut cursor, address, rem_index)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
)?;
}
}
Ok(changesets)
}
/// Unwind and clear storage history indices. /// Unwind and clear storage history indices.
/// ///
/// Returns number of changesets walked. /// Returns number of changesets walked.
@ -1034,44 +898,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(()) Ok(())
} }
/// Insert account change index to database. Used inside AccountHistoryIndex stage
pub fn insert_account_history_index(
&self,
account_transitions: BTreeMap<Address, Vec<u64>>,
) -> std::result::Result<(), TransactionError> {
// insert indexes to AccountHistory.
for (address, mut indices) in account_transitions {
let mut last_shard = self.take_last_account_shard(address)?;
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
chunks.into_iter().try_for_each(|list| {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(
address,
*list.last().expect("Chuck does not return empty list") as BlockNumber,
),
BlockNumberList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(last_list).expect("Indices are presorted and not empty"),
)?
}
}
Ok(())
}
/// Query the block body by number. /// Query the block body by number.
pub fn block_body_indices( pub fn block_body_indices(
&self, &self,
@ -1141,23 +967,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(()) Ok(())
} }
/// Load last shard and check if it is full and remove if it is not. If list is empty, last
/// shard was full or there is no shards at all.
fn take_last_account_shard(
&self,
address: Address,
) -> std::result::Result<Vec<u64>, TransactionError> {
let mut cursor = self.tx.cursor_read::<tables::AccountHistory>()?;
let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
if let Some((shard_key, list)) = last {
// delete old shard so new one can be inserted.
self.tx.delete::<tables::AccountHistory>(shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
return Ok(list)
}
Ok(Vec::new())
}
/// Load last shard and check if it is full and remove if it is not. If list is empty, last /// Load last shard and check if it is full and remove if it is not. If list is empty, last
/// shard was full or there is no shards at all. /// shard was full or there is no shards at all.
pub fn take_last_storage_shard( pub fn take_last_storage_shard(
@ -1214,34 +1023,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(()) Ok(())
} }
/// iterate over accounts and insert them to hashing table
pub fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> std::result::Result<(), TransactionError> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
let hashes_accounts = accounts.into_iter().fold(
BTreeMap::new(),
|mut map: BTreeMap<H256, Option<Account>>, (address, account)| {
map.insert(keccak256(address), account);
map
},
);
hashes_accounts.into_iter().try_for_each(
|(hashed_address, account)| -> std::result::Result<(), TransactionError> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
},
)?;
Ok(())
}
/// Append blocks and insert its post state. /// Append blocks and insert its post state.
/// This will insert block data to all related tables and will update pipeline progress. /// This will insert block data to all related tables and will update pipeline progress.
pub fn append_blocks_with_post_state( pub fn append_blocks_with_post_state(
@ -1299,7 +1080,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
) -> std::result::Result<(), TransactionError> { ) -> std::result::Result<(), TransactionError> {
// account history stage // account history stage
{ {
let indices = self.get_account_block_numbers_from_changesets(range.clone())?; let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
self.insert_account_history_index(indices)?; self.insert_account_history_index(indices)?;
} }
@ -1333,8 +1114,8 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
// account hashing stage // account hashing stage
{ {
let lists = self.get_addresses_of_changed_accounts(range.clone())?; let lists = self.changed_accounts_with_range(range.clone())?;
let accounts = self.get_plainstate_accounts(lists.into_iter())?; let accounts = self.basic_accounts(lists.into_iter())?;
self.insert_account_for_hashing(accounts.into_iter())?; self.insert_account_for_hashing(accounts.into_iter())?;
} }
@ -1356,13 +1137,13 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
} }
} }
impl<'this, TX: DbTx<'this>> AccountProvider for DatabaseProvider<'this, TX> { impl<'this, TX: DbTx<'this>> AccountReader for DatabaseProvider<'this, TX> {
fn basic_account(&self, address: Address) -> Result<Option<Account>> { fn basic_account(&self, address: Address) -> Result<Option<Account>> {
Ok(self.tx.get::<tables::PlainAccountState>(address)?) Ok(self.tx.get::<tables::PlainAccountState>(address)?)
} }
} }
impl<'this, TX: DbTx<'this>> AccountExtProvider for DatabaseProvider<'this, TX> { impl<'this, TX: DbTx<'this>> AccountExtReader for DatabaseProvider<'this, TX> {
fn changed_accounts_with_range( fn changed_accounts_with_range(
&self, &self,
range: impl RangeBounds<BlockNumber>, range: impl RangeBounds<BlockNumber>,
@ -1386,6 +1167,177 @@ impl<'this, TX: DbTx<'this>> AccountExtProvider for DatabaseProvider<'this, TX>
.map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))) .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
.collect::<std::result::Result<Vec<_>, _>>()?) .collect::<std::result::Result<Vec<_>, _>>()?)
} }
fn changed_accounts_and_blocks_with_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> Result<BTreeMap<Address, Vec<u64>>> {
let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSet>()?;
let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> Result<_> {
let (index, account) = entry?;
accounts.entry(account.address).or_default().push(index);
Ok(accounts)
},
)?;
Ok(account_transitions)
}
}
impl<'this, TX: DbTxMut<'this> + DbTx<'this>> AccountWriter for DatabaseProvider<'this, TX> {
fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<()> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
// Aggregate all block changesets and make a list of accounts that have been changed.
self.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Option<Account>>, (_, account_before)| {
accounts.insert(account_before.address, account_before.info);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|(address, account)| (keccak256(address), account))
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedState (if Account is None remove it);
.try_for_each(|(hashed_address, account)| -> Result<()> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?;
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
})?;
Ok(())
}
fn unwind_account_history_indices(&self, range: RangeInclusive<BlockNumber>) -> Result<usize> {
let account_changeset = self
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
let changesets = account_changeset.len();
let last_indices = account_changeset
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all account and get last block number
.fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, u64>, (index, account)| {
// we just need address and lowest block number.
accounts.insert(account.address, index);
accounts
});
// try to unwind the index
let mut cursor = self.tx.cursor_write::<tables::AccountHistory>()?;
for (address, rem_index) in last_indices {
let shard_part = unwind_account_history_shards::<TX>(&mut cursor, address, rem_index)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
)?;
}
}
Ok(changesets)
}
fn insert_account_history_index(
&self,
account_transitions: BTreeMap<Address, Vec<u64>>,
) -> Result<()> {
// insert indexes to AccountHistory.
for (address, mut indices) in account_transitions {
// Load last shard and check if it is full and remove if it is not. If list is empty,
// last shard was full or there is no shards at all.
let mut last_shard = {
let mut cursor = self.tx.cursor_read::<tables::AccountHistory>()?;
let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
if let Some((shard_key, list)) = last {
// delete old shard so new one can be inserted.
self.tx.delete::<tables::AccountHistory>(shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
list
} else {
Vec::new()
}
};
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
chunks.into_iter().try_for_each(|list| {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(
address,
*list.last().expect("Chuck does not return empty list") as BlockNumber,
),
BlockNumberList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(last_list).expect("Indices are presorted and not empty"),
)?
}
}
Ok(())
}
fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> Result<()> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
let hashes_accounts = accounts.into_iter().fold(
BTreeMap::new(),
|mut map: BTreeMap<H256, Option<Account>>, (address, account)| {
map.insert(keccak256(address), account);
map
},
);
hashes_accounts.into_iter().try_for_each(|(hashed_address, account)| -> Result<()> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
})?;
Ok(())
}
} }
impl<'this, TX: DbTx<'this>> HeaderProvider for DatabaseProvider<'this, TX> { impl<'this, TX: DbTx<'this>> HeaderProvider for DatabaseProvider<'this, TX> {

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
AccountProvider, BlockHashProvider, PostState, PostStateDataProvider, StateProvider, AccountReader, BlockHashProvider, PostState, PostStateDataProvider, StateProvider,
StateRootProvider, StateRootProvider,
}; };
use reth_interfaces::{provider::ProviderError, Result}; use reth_interfaces::{provider::ProviderError, Result};
@ -39,9 +39,7 @@ impl<SP: StateProvider, PSDP: PostStateDataProvider> BlockHashProvider
} }
} }
impl<SP: StateProvider, PSDP: PostStateDataProvider> AccountProvider impl<SP: StateProvider, PSDP: PostStateDataProvider> AccountReader for PostStateProvider<SP, PSDP> {
for PostStateProvider<SP, PSDP>
{
fn basic_account(&self, address: Address) -> Result<Option<Account>> { fn basic_account(&self, address: Address) -> Result<Option<Account>> {
if let Some(account) = self.post_state_data_provider.state().account(&address) { if let Some(account) = self.post_state_data_provider.state().account(&address) {
Ok(*account) Ok(*account)

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
providers::state::macros::delegate_provider_impls, AccountProvider, BlockHashProvider, providers::state::macros::delegate_provider_impls, AccountReader, BlockHashProvider, PostState,
PostState, ProviderError, StateProvider, StateRootProvider, ProviderError, StateProvider, StateRootProvider,
}; };
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbDupCursorRO}, cursor::{DbCursorRO, DbDupCursorRO},
@ -102,7 +102,7 @@ impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> {
} }
} }
impl<'a, 'b, TX: DbTx<'a>> AccountProvider for HistoricalStateProviderRef<'a, 'b, TX> { impl<'a, 'b, TX: DbTx<'a>> AccountReader for HistoricalStateProviderRef<'a, 'b, TX> {
/// Get basic account information. /// Get basic account information.
fn basic_account(&self, address: Address) -> Result<Option<Account>> { fn basic_account(&self, address: Address) -> Result<Option<Account>> {
match self.account_history_lookup(address)? { match self.account_history_lookup(address)? {
@ -219,7 +219,7 @@ delegate_provider_impls!(HistoricalStateProvider<'a, TX> where [TX: DbTx<'a>]);
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{ use crate::{
AccountProvider, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider, AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider,
}; };
use reth_db::{ use reth_db::{
database::Database, database::Database,

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
providers::state::macros::delegate_provider_impls, AccountProvider, BlockHashProvider, providers::state::macros::delegate_provider_impls, AccountReader, BlockHashProvider, PostState,
PostState, StateProvider, StateRootProvider, StateProvider, StateRootProvider,
}; };
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbDupCursorRO}, cursor::{DbCursorRO, DbDupCursorRO},
@ -28,7 +28,7 @@ impl<'a, 'b, TX: DbTx<'a>> LatestStateProviderRef<'a, 'b, TX> {
} }
} }
impl<'a, 'b, TX: DbTx<'a>> AccountProvider for LatestStateProviderRef<'a, 'b, TX> { impl<'a, 'b, TX: DbTx<'a>> AccountReader for LatestStateProviderRef<'a, 'b, TX> {
/// Get basic account information. /// Get basic account information.
fn basic_account(&self, address: Address) -> Result<Option<Account>> { fn basic_account(&self, address: Address) -> Result<Option<Account>> {
self.db.get::<tables::PlainAccountState>(address).map_err(Into::into) self.db.get::<tables::PlainAccountState>(address).map_err(Into::into)

View File

@ -23,7 +23,7 @@ pub(crate) use delegate_impls_to_as_ref;
/// Delegates the provider trait implementations to the `as_ref` function of the type: /// Delegates the provider trait implementations to the `as_ref` function of the type:
/// ///
/// [AccountProvider](crate::AccountProvider) /// [AccountReader](crate::AccountReader)
/// [BlockHashProvider](crate::BlockHashProvider) /// [BlockHashProvider](crate::BlockHashProvider)
/// [StateProvider](crate::StateProvider) /// [StateProvider](crate::StateProvider)
macro_rules! delegate_provider_impls { macro_rules! delegate_provider_impls {
@ -33,7 +33,7 @@ macro_rules! delegate_provider_impls {
StateRootProvider $(where [$($generics)*])? { StateRootProvider $(where [$($generics)*])? {
fn state_root(&self, state: crate::PostState) -> reth_interfaces::Result<reth_primitives::H256>; fn state_root(&self, state: crate::PostState) -> reth_interfaces::Result<reth_primitives::H256>;
} }
AccountProvider $(where [$($generics)*])? { AccountReader $(where [$($generics)*])? {
fn basic_account(&self, address: reth_primitives::Address) -> reth_interfaces::Result<Option<reth_primitives::Account>>; fn basic_account(&self, address: reth_primitives::Address) -> reth_interfaces::Result<Option<reth_primitives::Account>>;
} }
BlockHashProvider $(where [$($generics)*])? { BlockHashProvider $(where [$($generics)*])? {

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
traits::{BlockSource, ReceiptProvider}, traits::{BlockSource, ReceiptProvider},
AccountProvider, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, AccountReader, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider,
BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, PostStateDataProvider, BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, PostStateDataProvider,
StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider, StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider,
WithdrawalsProvider, WithdrawalsProvider,
@ -362,7 +362,7 @@ impl BlockProviderIdExt for MockEthProvider {
} }
} }
impl AccountProvider for MockEthProvider { impl AccountReader for MockEthProvider {
fn basic_account(&self, address: Address) -> Result<Option<Account>> { fn basic_account(&self, address: Address) -> Result<Option<Account>> {
Ok(self.accounts.lock().get(&address).cloned().map(|a| a.account)) Ok(self.accounts.lock().get(&address).cloned().map(|a| a.account))
} }

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
traits::{BlockSource, ReceiptProvider}, traits::{BlockSource, ReceiptProvider},
AccountProvider, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, AccountReader, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider,
BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, StageCheckpointReader, BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, StageCheckpointReader,
StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider, StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider,
WithdrawalsProvider, WithdrawalsProvider,
@ -212,7 +212,7 @@ impl HeaderProvider for NoopProvider {
} }
} }
impl AccountProvider for NoopProvider { impl AccountReader for NoopProvider {
fn basic_account(&self, _address: Address) -> Result<Option<Account>> { fn basic_account(&self, _address: Address) -> Result<Option<Account>> {
Ok(None) Ok(None)
} }

View File

@ -1,20 +1,23 @@
use auto_impl::auto_impl; use auto_impl::auto_impl;
use reth_interfaces::Result; use reth_interfaces::Result;
use reth_primitives::{Account, Address, BlockNumber}; use reth_primitives::{Account, Address, BlockNumber};
use std::{collections::BTreeSet, ops::RangeBounds}; use std::{
collections::{BTreeMap, BTreeSet},
ops::{RangeBounds, RangeInclusive},
};
/// Account provider /// Account reader
#[auto_impl(&, Arc, Box)] #[auto_impl(&, Arc, Box)]
pub trait AccountProvider: Send + Sync { pub trait AccountReader: Send + Sync {
/// Get basic account information. /// Get basic account information.
/// ///
/// Returns `None` if the account doesn't exist. /// Returns `None` if the account doesn't exist.
fn basic_account(&self, address: Address) -> Result<Option<Account>>; fn basic_account(&self, address: Address) -> Result<Option<Account>>;
} }
/// Account provider /// Account reader
#[auto_impl(&, Arc, Box)] #[auto_impl(&, Arc, Box)]
pub trait AccountExtProvider: Send + Sync { pub trait AccountExtReader: Send + Sync {
/// Iterate over account changesets and return all account address that were changed. /// Iterate over account changesets and return all account address that were changed.
fn changed_accounts_with_range( fn changed_accounts_with_range(
&self, &self,
@ -22,11 +25,44 @@ pub trait AccountExtProvider: Send + Sync {
) -> Result<BTreeSet<Address>>; ) -> Result<BTreeSet<Address>>;
/// Get basic account information for multiple accounts. A more efficient version than calling /// Get basic account information for multiple accounts. A more efficient version than calling
/// [`AccountProvider::basic_account`] repeatedly. /// [`AccountReader::basic_account`] repeatedly.
/// ///
/// Returns `None` if the account doesn't exist. /// Returns `None` if the account doesn't exist.
fn basic_accounts( fn basic_accounts(
&self, &self,
_iter: impl IntoIterator<Item = Address>, _iter: impl IntoIterator<Item = Address>,
) -> Result<Vec<(Address, Option<Account>)>>; ) -> Result<Vec<(Address, Option<Account>)>>;
/// Iterate over account changesets and return all account addresses that were changed alongside
/// each specific set of blocks.
///
/// NOTE: Get inclusive range of blocks.
fn changed_accounts_and_blocks_with_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> Result<BTreeMap<Address, Vec<BlockNumber>>>;
}
/// Account reader
#[auto_impl(&, Arc, Box)]
pub trait AccountWriter: Send + Sync {
/// Unwind and clear account hashing
fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<()>;
/// Unwind and clear account history indices.
///
/// Returns number of changesets walked.
fn unwind_account_history_indices(&self, range: RangeInclusive<BlockNumber>) -> Result<usize>;
/// Insert account change index to database. Used inside AccountHistoryIndex stage
fn insert_account_history_index(
&self,
account_transitions: BTreeMap<Address, Vec<u64>>,
) -> Result<()>;
/// iterate over accounts and insert them to hashing table
fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> Result<()>;
} }

View File

@ -1,7 +1,7 @@
//! Collection of common provider traits. //! Collection of common provider traits.
mod account; mod account;
pub use account::{AccountExtProvider, AccountProvider}; pub use account::{AccountExtReader, AccountReader, AccountWriter};
mod block; mod block;
pub use block::{BlockProvider, BlockProviderIdExt, BlockSource}; pub use block::{BlockProvider, BlockProviderIdExt, BlockSource};

View File

@ -1,4 +1,4 @@
use super::AccountProvider; use super::AccountReader;
use crate::{post_state::PostState, BlockHashProvider, BlockIdProvider}; use crate::{post_state::PostState, BlockHashProvider, BlockIdProvider};
use auto_impl::auto_impl; use auto_impl::auto_impl;
use reth_interfaces::{provider::ProviderError, Result}; use reth_interfaces::{provider::ProviderError, Result};
@ -13,7 +13,7 @@ pub type StateProviderBox<'a> = Box<dyn StateProvider + 'a>;
/// An abstraction for a type that provides state data. /// An abstraction for a type that provides state data.
#[auto_impl(&, Arc, Box)] #[auto_impl(&, Arc, Box)]
pub trait StateProvider: pub trait StateProvider:
BlockHashProvider + AccountProvider + StateRootProvider + Send + Sync BlockHashProvider + AccountReader + StateRootProvider + Send + Sync
{ {
/// Get storage of given account. /// Get storage of given account.
fn storage(&self, account: Address, storage_key: StorageKey) -> Result<Option<StorageValue>>; fn storage(&self, account: Address, storage_key: StorageKey) -> Result<Option<StorageValue>>;

View File

@ -11,7 +11,7 @@ use reth_primitives::{
TransactionSignedEcRecovered, TxHash, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID, TransactionSignedEcRecovered, TxHash, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID,
LEGACY_TX_TYPE_ID, U256, LEGACY_TX_TYPE_ID, U256,
}; };
use reth_provider::{AccountProvider, StateProviderFactory}; use reth_provider::{AccountReader, StateProviderFactory};
use std::{fmt, marker::PhantomData, sync::Arc, time::Instant}; use std::{fmt, marker::PhantomData, sync::Arc, time::Instant};
/// A Result type returned after checking a transaction's validity. /// A Result type returned after checking a transaction's validity.