chore: move AccountWriter methods to HashingWriter and HistoryWriter (#3332)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
joshieDo
2023-06-23 11:55:07 +01:00
committed by GitHub
parent f7d4756439
commit 3e07a5d508
9 changed files with 184 additions and 189 deletions

View File

@ -9,9 +9,7 @@ use reth_db::{
version::{check_db_version_file, create_db_version_file, DatabaseVersionError},
};
use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, StorageEntry, H256, U256};
use reth_provider::{
AccountWriter, DatabaseProviderRW, HashingWriter, HistoryWriter, PostState, ProviderFactory,
};
use reth_provider::{DatabaseProviderRW, HashingWriter, HistoryWriter, PostState, ProviderFactory};
use std::{collections::BTreeMap, fs, path::Path, sync::Arc};
use tracing::debug;

View File

@ -16,7 +16,7 @@ use reth_primitives::{
StageId,
},
};
use reth_provider::{AccountExtReader, AccountWriter, DatabaseProviderRW};
use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter};
use std::{
cmp::max,
fmt::Debug,

View File

@ -1,7 +1,7 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_provider::{AccountExtReader, AccountWriter, DatabaseProviderRW};
use reth_provider::{AccountExtReader, DatabaseProviderRW, HistoryWriter};
use std::fmt::Debug;
/// Stage is indexing history the account changesets generated in

View File

@ -15,8 +15,8 @@
/// Various provider traits.
mod traits;
pub use traits::{
AccountExtReader, AccountReader, AccountWriter, BlockExecutor, BlockHashProvider,
BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource,
AccountExtReader, AccountReader, BlockExecutor, BlockHashProvider, BlockIdProvider,
BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource,
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider,
ExecutorFactory, HashingWriter, HeaderProvider, HistoryWriter, PostStateDataProvider,

View File

@ -2,9 +2,9 @@ use crate::{
insert_canonical_block,
post_state::StorageChangeset,
traits::{AccountExtReader, BlockSource, ReceiptProvider, StageCheckpointWriter},
AccountReader, AccountWriter, BlockHashProvider, BlockNumProvider, BlockProvider,
EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError,
StageCheckpointReader, StorageReader, TransactionsProvider, WithdrawalsProvider,
AccountReader, BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider,
HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError, StageCheckpointReader,
StorageReader, TransactionsProvider, WithdrawalsProvider,
};
use itertools::{izip, Itertools};
use reth_db::{
@ -851,159 +851,6 @@ impl<'this, TX: DbTx<'this>> AccountExtReader for DatabaseProvider<'this, TX> {
}
}
impl<'this, TX: DbTxMut<'this> + DbTx<'this>> AccountWriter for DatabaseProvider<'this, TX> {
fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<()> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
// Aggregate all block changesets and make a list of accounts that have been changed.
self.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Option<Account>>, (_, account_before)| {
accounts.insert(account_before.address, account_before.info);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|(address, account)| (keccak256(address), account))
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedState (if Account is None remove it);
.try_for_each(|(hashed_address, account)| -> Result<()> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?;
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
})?;
Ok(())
}
fn unwind_account_history_indices(&self, range: RangeInclusive<BlockNumber>) -> Result<usize> {
let account_changeset = self
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
let changesets = account_changeset.len();
let last_indices = account_changeset
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all account and get last block number
.fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, u64>, (index, account)| {
// we just need address and lowest block number.
accounts.insert(account.address, index);
accounts
});
// try to unwind the index
let mut cursor = self.tx.cursor_write::<tables::AccountHistory>()?;
for (address, rem_index) in last_indices {
let shard_part = unwind_account_history_shards::<TX>(&mut cursor, address, rem_index)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
)?;
}
}
Ok(changesets)
}
fn insert_account_history_index(
&self,
account_transitions: BTreeMap<Address, Vec<u64>>,
) -> Result<()> {
// insert indexes to AccountHistory.
for (address, mut indices) in account_transitions {
// Load last shard and check if it is full and remove if it is not. If list is empty,
// last shard was full or there is no shards at all.
let mut last_shard = {
let mut cursor = self.tx.cursor_read::<tables::AccountHistory>()?;
let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
if let Some((shard_key, list)) = last {
// delete old shard so new one can be inserted.
self.tx.delete::<tables::AccountHistory>(shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
list
} else {
Vec::new()
}
};
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
chunks.into_iter().try_for_each(|list| {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(
address,
*list.last().expect("Chuck does not return empty list") as BlockNumber,
),
BlockNumberList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(last_list).expect("Indices are presorted and not empty"),
)?
}
}
Ok(())
}
fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> Result<()> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
let hashes_accounts = accounts.into_iter().fold(
BTreeMap::new(),
|mut map: BTreeMap<H256, Option<Account>>, (address, account)| {
map.insert(keccak256(address), account);
map
},
);
hashes_accounts.into_iter().try_for_each(|(hashed_address, account)| -> Result<()> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
})?;
Ok(())
}
}
impl<'this, TX: DbTx<'this>> HeaderProvider for DatabaseProvider<'this, TX> {
fn header(&self, block_hash: &BlockHash) -> Result<Option<Header>> {
if let Some(num) = self.block_number(*block_hash)? {
@ -1648,6 +1495,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider
Ok(())
}
fn insert_storage_for_hashing(
&self,
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
@ -1683,6 +1531,68 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider
})?;
Ok(())
}
fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<()> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
// Aggregate all block changesets and make a list of accounts that have been changed.
self.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Option<Account>>, (_, account_before)| {
accounts.insert(account_before.address, account_before.info);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|(address, account)| (keccak256(address), account))
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedState (if Account is None remove it);
.try_for_each(|(hashed_address, account)| -> Result<()> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?;
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
})?;
Ok(())
}
fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> Result<()> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
let hashes_accounts = accounts.into_iter().fold(
BTreeMap::new(),
|mut map: BTreeMap<H256, Option<Account>>, (address, account)| {
map.insert(keccak256(address), account);
map
},
);
hashes_accounts.into_iter().try_for_each(|(hashed_address, account)| -> Result<()> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
})?;
Ok(())
}
}
impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider<'this, TX> {
@ -1701,6 +1611,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
Ok(())
}
fn insert_storage_history_index(
&self,
storage_transitions: BTreeMap<(Address, H256), Vec<u64>>,
@ -1739,6 +1650,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
}
Ok(())
}
fn unwind_storage_history_indices(&self, range: Range<BlockNumberAddress>) -> Result<usize> {
let storage_changesets = self
.tx
@ -1779,4 +1691,93 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
Ok(changesets)
}
fn insert_account_history_index(
&self,
account_transitions: BTreeMap<Address, Vec<u64>>,
) -> Result<()> {
// insert indexes to AccountHistory.
for (address, mut indices) in account_transitions {
// Load last shard and check if it is full and remove if it is not. If list is empty,
// last shard was full or there is no shards at all.
let mut last_shard = {
let mut cursor = self.tx.cursor_read::<tables::AccountHistory>()?;
let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
if let Some((shard_key, list)) = last {
// delete old shard so new one can be inserted.
self.tx.delete::<tables::AccountHistory>(shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
list
} else {
Vec::new()
}
};
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
chunks.into_iter().try_for_each(|list| {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(
address,
*list.last().expect("Chuck does not return empty list") as BlockNumber,
),
BlockNumberList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(last_list).expect("Indices are presorted and not empty"),
)?
}
}
Ok(())
}
fn unwind_account_history_indices(&self, range: RangeInclusive<BlockNumber>) -> Result<usize> {
let account_changeset = self
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
let changesets = account_changeset.len();
let last_indices = account_changeset
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all account and get last block number
.fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, u64>, (index, account)| {
// we just need address and lowest block number.
accounts.insert(account.address, index);
accounts
});
// try to unwind the index
let mut cursor = self.tx.cursor_write::<tables::AccountHistory>()?;
for (address, rem_index) in last_indices {
let shard_part = unwind_account_history_shards::<TX>(&mut cursor, address, rem_index)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
self.tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
BlockNumberList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
)?;
}
}
Ok(changesets)
}
}

View File

@ -42,27 +42,3 @@ pub trait AccountExtReader: Send + Sync {
range: RangeInclusive<BlockNumber>,
) -> Result<BTreeMap<Address, Vec<BlockNumber>>>;
}
/// Account reader
#[auto_impl(&, Arc, Box)]
pub trait AccountWriter: Send + Sync {
/// Unwind and clear account hashing
fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<()>;
/// Unwind and clear account history indices.
///
/// Returns number of changesets walked.
fn unwind_account_history_indices(&self, range: RangeInclusive<BlockNumber>) -> Result<usize>;
/// Insert account change index to database. Used inside AccountHistoryIndex stage
fn insert_account_history_index(
&self,
account_transitions: BTreeMap<Address, Vec<u64>>,
) -> Result<()>;
/// iterate over accounts and insert them to hashing table
fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> Result<()>;
}

View File

@ -1,12 +1,21 @@
use auto_impl::auto_impl;
use reth_db::models::BlockNumberAddress;
use reth_interfaces::Result;
use reth_primitives::{Address, BlockNumber, StorageEntry, H256};
use reth_primitives::{Account, Address, BlockNumber, StorageEntry, H256};
use std::ops::{Range, RangeInclusive};
/// Hashing Writer
#[auto_impl(&, Arc, Box)]
pub trait HashingWriter: Send + Sync {
/// Unwind and clear account hashing
fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<()>;
/// Inserts all accounts into [reth_db::tables::AccountHistory] table.
fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> Result<()>;
/// Unwind and clear storage hashing
fn unwind_storage_hashing(&self, range: Range<BlockNumberAddress>) -> Result<()>;

View File

@ -10,6 +10,17 @@ use std::{
/// History Writer
#[auto_impl(&, Arc, Box)]
pub trait HistoryWriter: Send + Sync {
/// Unwind and clear account history indices.
///
/// Returns number of changesets walked.
fn unwind_account_history_indices(&self, range: RangeInclusive<BlockNumber>) -> Result<usize>;
/// Insert account change index to database. Used inside AccountHistoryIndex stage
fn insert_account_history_index(
&self,
account_transitions: BTreeMap<Address, Vec<u64>>,
) -> Result<()>;
/// Unwind and clear storage history indices.
///
/// Returns number of changesets walked.

View File

@ -1,7 +1,7 @@
//! Collection of common provider traits.
mod account;
pub use account::{AccountExtReader, AccountReader, AccountWriter};
pub use account::{AccountExtReader, AccountReader};
mod storage;
pub use storage::StorageReader;