feat: index account/storage history (#978)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
rakita
2023-01-26 17:03:02 +01:00
committed by GitHub
parent 1d5cce1092
commit 6dcced0cfb
10 changed files with 1106 additions and 52 deletions

View File

@ -0,0 +1,474 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::{Database, DatabaseGAT},
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey},
tables,
transaction::{DbTx, DbTxMut, DbTxMutGAT},
TransitionList,
};
use reth_primitives::{Address, TransitionId};
use std::{collections::BTreeMap, fmt::Debug};
use tracing::*;
const INDEX_ACCOUNT_HISTORY: StageId = StageId("IndexAccountHistoryStage");
/// Account hashing stage hashes plain account.
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
#[derive(Debug)]
pub struct IndexAccountHistoryStage {
/// Number of blocks after which the control
/// flow will be returned to the pipeline for commit.
pub commit_threshold: u64,
}
impl Default for IndexAccountHistoryStage {
fn default() -> Self {
Self { commit_threshold: 100_000 }
}
}
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
/// Return the id of the stage
fn id(&self) -> StageId {
INDEX_ACCOUNT_HISTORY
}
/// Execute the stage.
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let previous_stage_progress = input.previous_stage_progress();
// read account changeset, merge it into one changeset and calculate account hashes.
let from_transition = tx.get_block_transition(stage_progress)?;
// NOTE: can probably done more probabilistic take of bundles with transition but it is
// guess game for later. Transitions better reflect amount of work.
let to_block =
std::cmp::min(stage_progress + self.commit_threshold, previous_stage_progress);
let to_transition = tx.get_block_transition(to_block)?;
let account_changesets = tx
.cursor_read::<tables::AccountChangeSet>()?
.walk(from_transition)?
.take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition).unwrap_or_default())
.collect::<Result<Vec<_>, _>>()?;
let account_changeset_lists = account_changesets
.into_iter()
// fold all account to one set of changed accounts
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Vec<u64>>, (index, account)| {
accounts.entry(account.address).or_default().push(index);
accounts
},
);
// insert indexes to AccontHistory.
for (address, mut indices) in account_changeset_lists {
let mut last_shard = take_last_account_shard(tx, address)?;
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
chunks.into_iter().try_for_each(|list| {
tx.put::<tables::AccountHistory>(
ShardedKey::new(
address,
*list.last().expect("Chuck does not return empty list") as TransitionId,
),
TransitionList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
TransitionList::new(last_list).expect("Indices are presorted and not empty"),
)?
}
}
info!(target: "sync::stages::index_account_history", "Stage finished");
Ok(ExecOutput { stage_progress: to_block, done: true })
}
/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, "Unwinding");
let from_transition_rev = tx.get_block_transition(input.unwind_to)?;
let to_transition_rev = tx.get_block_transition(input.stage_progress)?;
let mut cursor = tx.cursor_write::<tables::AccountHistory>()?;
let account_changeset = tx
.cursor_read::<tables::AccountChangeSet>()?
.walk(from_transition_rev)?
.take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition_rev).unwrap_or_default())
.collect::<Result<Vec<_>, _>>()?;
let last_indices = account_changeset
.into_iter()
// reverse so we can get lowest transition id where we need to unwind account.
.rev()
// fold all account and get last transition index
.fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, u64>, (index, account)| {
// we just need address and lowest transition id.
accounts.insert(account.address, index);
accounts
});
// try to unwind the index
for (address, rem_index) in last_indices {
let shard_part = unwind_account_history_shards::<DB>(&mut cursor, address, rem_index)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
tx.put::<tables::AccountHistory>(
ShardedKey::new(address, u64::MAX),
TransitionList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
)?;
}
}
// from HistoryIndex higher than that number.
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
/// Load last shard and check if it is full and remove if it is not. If list is empty, last shard
/// was full or there is no shards at all.
pub fn take_last_account_shard<DB: Database>(
tx: &Transaction<'_, DB>,
address: Address,
) -> Result<Vec<u64>, StageError> {
let mut cursor = tx.cursor_read::<tables::AccountHistory>()?;
let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
if let Some((shard_key, list)) = last {
// delete old shard so new one can be inserted.
tx.delete::<tables::AccountHistory>(shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
return Ok(list)
}
Ok(Vec::new())
}
/// Unwind all history shards. For boundary shard, remove it from database and
/// return last part of shard with still valid items. If all full shard were removed, return list
/// would be empty.
pub fn unwind_account_history_shards<DB: Database>(
cursor: &mut <<DB as DatabaseGAT<'_>>::TXMut as DbTxMutGAT<'_>>::CursorMut<
tables::AccountHistory,
>,
address: Address,
transition_id: TransitionId,
) -> Result<Vec<usize>, StageError> {
let mut item = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
while let Some((sharded_key, list)) = item {
// there is no more shard for address
if sharded_key.key != address {
break
}
cursor.delete_current()?;
// check first item and if it is more and eq than `transition_id` delete current
// item.
let first = list.iter(0).next().expect("List can't empty");
if first >= transition_id as usize {
item = cursor.prev()?;
continue
} else if transition_id <= sharded_key.highest_transition_id {
// if first element is in scope whole list would be removed.
// so at least this first element is present.
return Ok(list.iter(0).take_while(|i| *i < transition_id as usize).collect::<Vec<_>>())
} else {
let new_list = list.iter(0).collect::<Vec<_>>();
return Ok(new_list)
}
}
Ok(Vec::new())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{TestTransaction, PREV_STAGE_ID};
use reth_db::models::AccountBeforeTx;
use reth_primitives::{hex_literal::hex, H160};
const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001"));
fn acc() -> AccountBeforeTx {
AccountBeforeTx { address: ADDRESS, info: None }
}
/// Shard for account
fn shard(shard_index: u64) -> ShardedKey<H160> {
ShardedKey { key: ADDRESS, highest_transition_id: shard_index }
}
fn list(list: &[usize]) -> TransitionList {
TransitionList::new(list).unwrap()
}
fn cast(
table: Vec<(ShardedKey<H160>, TransitionList)>,
) -> BTreeMap<ShardedKey<H160>, Vec<usize>> {
table
.into_iter()
.map(|(k, v)| {
let v = v.iter(0).collect();
(k, v)
})
.collect()
}
fn partial_setup(tx: &TestTransaction) {
// setup
tx.commit(|tx| {
// we just need first and last
tx.put::<tables::BlockTransitionIndex>(0, 3).unwrap();
tx.put::<tables::BlockTransitionIndex>(5, 7).unwrap();
// setup changeset that are going to be applied to history index
tx.put::<tables::AccountChangeSet>(4, acc()).unwrap();
tx.put::<tables::AccountChangeSet>(6, acc()).unwrap();
Ok(())
})
.unwrap()
}
async fn run(tx: &TestTransaction, run_to: u64) {
let mut input = ExecInput::default();
input.previous_stage = Some((PREV_STAGE_ID, run_to));
let mut stage = IndexAccountHistoryStage::default();
let mut tx = tx.inner();
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(out, ExecOutput { stage_progress: 5, done: true });
tx.commit().unwrap();
}
async fn unwind(tx: &TestTransaction, unwind_from: u64, unwind_to: u64) {
let mut input = UnwindInput::default();
input.stage_progress = unwind_from;
input.unwind_to = unwind_to;
let mut stage = IndexAccountHistoryStage::default();
let mut tx = tx.inner();
let out = stage.unwind(&mut tx, input).await.unwrap();
assert_eq!(out, UnwindOutput { stage_progress: unwind_to });
tx.commit().unwrap();
}
#[tokio::test]
async fn insert_index_to_empty() {
// init
let tx = TestTransaction::default();
// setup
partial_setup(&tx);
// run
run(&tx, 5).await;
// verify
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 6]),]));
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
let table = tx.table::<tables::AccountHistory>().unwrap();
assert!(table.is_empty());
}
#[tokio::test]
async fn insert_index_to_not_empty_shard() {
// init
let tx = TestTransaction::default();
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::AccountHistory>(shard(u64::MAX), list(&[1, 2, 3])).unwrap();
Ok(())
})
.unwrap();
// run
run(&tx, 5).await;
// verify
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 6]),]));
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3]),]));
}
#[tokio::test]
async fn insert_index_to_full_shard() {
// init
let tx = TestTransaction::default();
let full_list = vec![3; NUM_OF_INDICES_IN_SHARD];
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::AccountHistory>(shard(u64::MAX), list(&full_list)).unwrap();
Ok(())
})
.unwrap();
// run
run(&tx, 5).await;
// verify
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([(shard(3), full_list.clone()), (shard(u64::MAX), vec![4, 6])])
);
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), full_list)]));
}
#[tokio::test]
async fn insert_index_to_fill_shard() {
// init
let tx = TestTransaction::default();
let mut close_full_list = vec![1; NUM_OF_INDICES_IN_SHARD - 2];
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::AccountHistory>(shard(u64::MAX), list(&close_full_list)).unwrap();
Ok(())
})
.unwrap();
// run
run(&tx, 5).await;
// verify
close_full_list.push(4);
close_full_list.push(6);
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list.clone()),]));
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
close_full_list.pop();
close_full_list.pop();
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list),]));
// verify initial state
}
#[tokio::test]
async fn insert_index_second_half_shard() {
// init
let tx = TestTransaction::default();
let mut close_full_list = vec![1; NUM_OF_INDICES_IN_SHARD - 1];
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::AccountHistory>(shard(u64::MAX), list(&close_full_list)).unwrap();
Ok(())
})
.unwrap();
// run
run(&tx, 5).await;
// verify
close_full_list.push(4);
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([(shard(4), close_full_list.clone()), (shard(u64::MAX), vec![6])])
);
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
close_full_list.pop();
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list),]));
}
#[tokio::test]
async fn insert_index_to_third_shard() {
// init
let tx = TestTransaction::default();
let full_list = vec![1; NUM_OF_INDICES_IN_SHARD];
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::AccountHistory>(shard(1), list(&full_list)).unwrap();
tx.put::<tables::AccountHistory>(shard(2), list(&full_list)).unwrap();
tx.put::<tables::AccountHistory>(shard(u64::MAX), list(&[2, 3])).unwrap();
Ok(())
})
.unwrap();
run(&tx, 5).await;
// verify
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([
(shard(1), full_list.clone()),
(shard(2), full_list.clone()),
(shard(u64::MAX), vec![2, 3, 4, 6])
])
);
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([
(shard(1), full_list.clone()),
(shard(2), full_list.clone()),
(shard(u64::MAX), vec![2, 3])
])
);
}
}

View File

@ -0,0 +1,503 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::{Database, DatabaseGAT},
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey},
tables,
transaction::{DbTx, DbTxMut, DbTxMutGAT},
TransitionList,
};
use reth_primitives::{Address, TransitionId, H256};
use std::{collections::BTreeMap, fmt::Debug};
use tracing::*;
const INDEX_STORAGE_HISTORY: StageId = StageId("IndexStorageHistoryStage");
/// Account hashing stage hashes plain account.
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
#[derive(Debug)]
pub struct IndexStorageHistoryStage {
/// Number of blocks after which the control
/// flow will be returned to the pipeline for commit.
pub commit_threshold: u64,
}
impl Default for IndexStorageHistoryStage {
fn default() -> Self {
Self { commit_threshold: 100_000 }
}
}
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
/// Return the id of the stage
fn id(&self) -> StageId {
INDEX_STORAGE_HISTORY
}
/// Execute the stage.
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let previous_stage_progress = input.previous_stage_progress();
// read storge changeset, merge it into one changeset and calculate account hashes.
let from_transition = tx.get_block_transition(stage_progress)?;
// NOTE: can probably done more probabilistic take of bundles with transition but it is
// guess game for later. Transitions better reflect amount of work.
let to_block =
std::cmp::min(stage_progress + self.commit_threshold, previous_stage_progress);
let to_transition = tx.get_block_transition(to_block)?;
let storage_chageset = tx
.cursor_read::<tables::StorageChangeSet>()?
.walk((from_transition, Address::zero()).into())?
.take_while(|res| {
res.as_ref().map(|(k, _)| k.transition_id() < to_transition).unwrap_or_default()
})
.collect::<Result<Vec<_>, _>>()?;
// fold all storages to one set of changes
let storage_changeset_lists = storage_chageset.into_iter().fold(
BTreeMap::new(),
|mut storages: BTreeMap<(Address, H256), Vec<u64>>, (index, storage)| {
storages
.entry((index.address(), storage.key))
.or_default()
.push(index.transition_id());
storages
},
);
for ((address, storage_key), mut indices) in storage_changeset_lists {
let mut last_shard = take_last_storage_shard(tx, address, storage_key)?;
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
// chunk indices and insert them in shards of N size.
chunks.into_iter().try_for_each(|list| {
tx.put::<tables::StorageHistory>(
StorageShardedKey::new(
address,
storage_key,
*list.last().expect("Chuck does not return empty list") as TransitionId,
),
TransitionList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
tx.put::<tables::StorageHistory>(
StorageShardedKey::new(address, storage_key, u64::MAX),
TransitionList::new(last_list).expect("Indices are presorted and not empty"),
)?;
}
}
info!(target: "sync::stages::index_storage_history", "Stage finished");
Ok(ExecOutput { stage_progress: to_block, done: true })
}
/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, "Unwinding");
let from_transition_rev = tx.get_block_transition(input.unwind_to)?;
let to_transition_rev = tx.get_block_transition(input.stage_progress)?;
let mut cursor = tx.cursor_write::<tables::StorageHistory>()?;
let storage_changesets = tx
.cursor_read::<tables::StorageChangeSet>()?
.walk((from_transition_rev, Address::zero()).into())?
.take_while(|res| {
res.as_ref().map(|(k, _)| k.transition_id() < to_transition_rev).unwrap_or_default()
})
.collect::<Result<Vec<_>, _>>()?;
let last_indices = storage_changesets
.into_iter()
// reverse so we can get lowest transition id where we need to unwind account.
.rev()
// fold all storages and get last transition index
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, H256), u64>, (index, storage)| {
// we just need address and lowest transition id.
accounts.insert((index.address(), storage.key), index.transition_id());
accounts
},
);
for ((address, storage_key), rem_index) in last_indices {
let shard_part =
unwind_storage_history_shards::<DB>(&mut cursor, address, storage_key, rem_index)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
tx.put::<tables::StorageHistory>(
StorageShardedKey::new(address, storage_key, u64::MAX),
TransitionList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
)?;
}
}
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
/// Load last shard and check if it is full and remove if it is not. If list is empty, last shard
/// was full or there is no shards at all.
pub fn take_last_storage_shard<DB: Database>(
tx: &Transaction<'_, DB>,
address: Address,
storage_key: H256,
) -> Result<Vec<u64>, StageError> {
let mut cursor = tx.cursor_read::<tables::StorageHistory>()?;
let last = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?;
if let Some((storage_shard_key, list)) = last {
// delete old shard so new one can be inserted.
tx.delete::<tables::StorageHistory>(storage_shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
return Ok(list)
}
Ok(Vec::new())
}
/// Unwind all history shards. For boundary shard, remove it from database and
/// return last part of shard with still valid items. If all full shard were removed, return list
/// would be empty but this does not mean that there is none shard left but that there is no
/// splitted shards.
pub fn unwind_storage_history_shards<DB: Database>(
cursor: &mut <<DB as DatabaseGAT<'_>>::TXMut as DbTxMutGAT<'_>>::CursorMut<
tables::StorageHistory,
>,
address: Address,
storage_key: H256,
transition_id: TransitionId,
) -> Result<Vec<usize>, StageError> {
let mut item = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?;
while let Some((storage_sharded_key, list)) = item {
// there is no more shard for address
if storage_sharded_key.address != address ||
storage_sharded_key.sharded_key.key != storage_key
{
// there is no more shard for address and storage_key.
break
}
cursor.delete_current()?;
// check first item and if it is more and eq than `transition_id` delete current
// item.
let first = list.iter(0).next().expect("List can't empty");
if first >= transition_id as usize {
item = cursor.prev()?;
continue
} else if transition_id <= storage_sharded_key.sharded_key.highest_transition_id {
// if first element is in scope whole list would be removed.
// so at least this first element is present.
return Ok(list.iter(0).take_while(|i| *i < transition_id as usize).collect::<Vec<_>>())
} else {
return Ok(list.iter(0).collect::<Vec<_>>())
}
}
Ok(Vec::new())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{TestTransaction, PREV_STAGE_ID};
use reth_db::models::{ShardedKey, TransitionIdAddress};
use reth_primitives::{hex_literal::hex, StorageEntry, H160, U256};
const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001"));
const STORAGE_KEY: H256 =
H256(hex!("0000000000000000000000000000000000000000000000000000000000000001"));
fn storage(key: H256) -> StorageEntry {
// Value is not used in indexing stage.
StorageEntry { key, value: U256::ZERO }
}
fn trns(transition_id: u64) -> TransitionIdAddress {
TransitionIdAddress((transition_id, ADDRESS))
}
/// Shard for account
fn shard(shard_index: u64) -> StorageShardedKey {
StorageShardedKey {
address: ADDRESS,
sharded_key: ShardedKey { key: STORAGE_KEY, highest_transition_id: shard_index },
}
}
fn list(list: &[usize]) -> TransitionList {
TransitionList::new(list).unwrap()
}
fn cast(
table: Vec<(StorageShardedKey, TransitionList)>,
) -> BTreeMap<StorageShardedKey, Vec<usize>> {
table
.into_iter()
.map(|(k, v)| {
let v = v.iter(0).collect();
(k, v)
})
.collect()
}
fn partial_setup(tx: &TestTransaction) {
// setup
tx.commit(|tx| {
// we just need first and last
tx.put::<tables::BlockTransitionIndex>(0, 3).unwrap();
tx.put::<tables::BlockTransitionIndex>(5, 7).unwrap();
// setup changeset that are going to be applied to history index
tx.put::<tables::StorageChangeSet>(trns(4), storage(STORAGE_KEY)).unwrap();
tx.put::<tables::StorageChangeSet>(trns(6), storage(STORAGE_KEY)).unwrap();
Ok(())
})
.unwrap()
}
async fn run(tx: &TestTransaction, run_to: u64) {
let mut input = ExecInput::default();
input.previous_stage = Some((PREV_STAGE_ID, run_to));
let mut stage = IndexStorageHistoryStage::default();
let mut tx = tx.inner();
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(out, ExecOutput { stage_progress: 5, done: true });
tx.commit().unwrap();
}
async fn unwind(tx: &TestTransaction, unwind_from: u64, unwind_to: u64) {
let mut input = UnwindInput::default();
input.stage_progress = unwind_from;
input.unwind_to = unwind_to;
let mut stage = IndexStorageHistoryStage::default();
let mut tx = tx.inner();
let out = stage.unwind(&mut tx, input).await.unwrap();
assert_eq!(out, UnwindOutput { stage_progress: unwind_to });
tx.commit().unwrap();
}
#[tokio::test]
async fn insert_index_to_empty() {
// init
let tx = TestTransaction::default();
// setup
partial_setup(&tx);
// run
run(&tx, 5).await;
// verify
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 6]),]));
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
let table = tx.table::<tables::StorageHistory>().unwrap();
assert!(table.is_empty());
}
#[tokio::test]
async fn insert_index_to_not_empty_shard() {
// init
let tx = TestTransaction::default();
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::StorageHistory>(shard(u64::MAX), list(&[1, 2, 3])).unwrap();
Ok(())
})
.unwrap();
// run
run(&tx, 5).await;
// verify
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 6]),]));
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3]),]));
}
#[tokio::test]
async fn insert_index_to_full_shard() {
// init
let tx = TestTransaction::default();
let mut input = ExecInput::default();
input.previous_stage = Some((PREV_STAGE_ID, 5));
// change does not matter only that account is present in changeset.
let full_list = vec![3; NUM_OF_INDICES_IN_SHARD];
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::StorageHistory>(shard(u64::MAX), list(&full_list)).unwrap();
Ok(())
})
.unwrap();
// run
run(&tx, 5).await;
// verify
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([(shard(3), full_list.clone()), (shard(u64::MAX), vec![4, 6])])
);
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), full_list)]));
}
#[tokio::test]
async fn insert_index_to_fill_shard() {
// init
let tx = TestTransaction::default();
let mut close_full_list = vec![1; NUM_OF_INDICES_IN_SHARD - 2];
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::StorageHistory>(shard(u64::MAX), list(&close_full_list)).unwrap();
Ok(())
})
.unwrap();
// run
run(&tx, 5).await;
// verify
close_full_list.push(4);
close_full_list.push(6);
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list.clone()),]));
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
close_full_list.pop();
close_full_list.pop();
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list),]));
// verify initial state
}
#[tokio::test]
async fn insert_index_second_half_shard() {
// init
let tx = TestTransaction::default();
let mut close_full_list = vec![1; NUM_OF_INDICES_IN_SHARD - 1];
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::StorageHistory>(shard(u64::MAX), list(&close_full_list)).unwrap();
Ok(())
})
.unwrap();
// run
run(&tx, 5).await;
// verify
close_full_list.push(4);
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([(shard(4), close_full_list.clone()), (shard(u64::MAX), vec![6])])
);
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
close_full_list.pop();
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list),]));
}
#[tokio::test]
async fn insert_index_to_third_shard() {
// init
let tx = TestTransaction::default();
let full_list = vec![1; NUM_OF_INDICES_IN_SHARD];
// setup
partial_setup(&tx);
tx.commit(|tx| {
tx.put::<tables::StorageHistory>(shard(1), list(&full_list)).unwrap();
tx.put::<tables::StorageHistory>(shard(2), list(&full_list)).unwrap();
tx.put::<tables::StorageHistory>(shard(u64::MAX), list(&[2, 3])).unwrap();
Ok(())
})
.unwrap();
run(&tx, 5).await;
// verify
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([
(shard(1), full_list.clone()),
(shard(2), full_list.clone()),
(shard(u64::MAX), vec![2, 3, 4, 6])
])
);
// unwind
unwind(&tx, 5, 0).await;
// verify initial state
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([
(shard(1), full_list.clone()),
(shard(2), full_list.clone()),
(shard(u64::MAX), vec![2, 3])
])
);
}
}

View File

@ -8,6 +8,10 @@ pub mod hashing_account;
pub mod hashing_storage;
/// The headers stage.
pub mod headers;
/// Intex history of account changes
pub mod index_account_history;
/// Index history of storage changes
pub mod index_storage_history;
/// Intermediate hashes and creating merkle root
pub mod merkle;
/// The sender recovery stage.

View File

@ -69,7 +69,7 @@ impl TestTransaction {
})
}
/// Return full table as BTreeMap
/// Return full table as Vec
pub(crate) fn table<T: Table>(&self) -> Result<Vec<(T::Key, T::Value)>, DbError>
where
T::Key: Default + Ord,

View File

@ -593,5 +593,22 @@ mod tests {
let list200: IntegerList = vec![200u64].into();
assert_eq!(list200, list);
}
// Seek greatest index
{
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<AccountHistory>().unwrap();
// It will seek the MAX value of transition index and try to use prev to get first
// biggers.
let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
let (key, list) = cursor
.prev()
.expect("element should exist.")
.expect("should be able to retrieve it.");
assert_eq!(ShardedKey::new(real_key, 400), key);
let list400: IntegerList = vec![400u64].into();
assert_eq!(list400, list);
}
}
}

View File

@ -21,7 +21,7 @@ use reth_primitives::{
TransactionSigned, TransitionId, TxHash, TxNumber, H256,
};
use self::models::StoredBlockBody;
use self::models::{storage_sharded_key::StorageShardedKey, StoredBlockBody};
/// Enum for the types of tables present in libmdbx.
#[derive(Debug)]
@ -198,54 +198,47 @@ dupsort!(
);
table!(
/// Stores the transaction numbers that changed each account.
/// Stores pointers to transition changeset with changes for each account key.
///
/// ```
/// use reth_primitives::{Address, IntegerList};
/// use reth_db::{transaction::{DbTxMut,DbTx}, mdbx::{EnvKind, Env, test_utils,WriteMap}, cursor::DbCursorRO,database::Database, tables::{AccountHistory,models::ShardedKey}};
/// use std::{str::FromStr,sync::Arc};
/// Last shard key of the storage will contains `u64::MAX` `TransitionId`,
/// this would allows us small optimization on db access when change is in plain state.
///
/// Imagine having shards as:
/// * `Address | 100`
/// * `Address | u64::MAX`
///
/// fn main() {
/// let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW);
/// let account = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047").unwrap();
///
/// // Setup if each shard can only take 1 transaction.
/// for i in 1..3 {
/// let key = ShardedKey::new(account, i * 100);
/// let list: IntegerList = vec![i * 100u64].into();
/// db.update(|tx| tx.put::<AccountHistory>(key.clone(), list.clone()).expect("")).unwrap();
/// }
///
/// // Is there any transaction after number 150 that changed this account?
/// {
/// let tx = db.tx().expect("");
/// let mut cursor = tx.cursor_read::<AccountHistory>().unwrap();
/// // It will seek the one greater or equal to the query. Since we have `Address | 100`,
/// // `Address | 200` in the database and we're querying `Address | 150` it will return us
/// // `Address | 200`.
/// let mut walker = cursor.walk(ShardedKey::new(account, 150)).unwrap();
/// let (key, list) = walker
/// .next()
/// .expect("element should exist.")
/// .expect("should be able to retrieve it.");
/// assert_eq!(ShardedKey::new(account, 200), key);
/// let list200: IntegerList = vec![200u64].into();
/// assert_eq!(list200, list);
/// assert!(walker.next().is_none());
/// }
/// }
/// ```
/// What we need to find is id that is one greater than N. Db `seek` function allows us to fetch
/// the shard that equal or more than asked. For example:
/// * For N=50 we would get first shard.
/// * for N=150 we would get second shard.
/// * If max transition id is 200 and we ask for N=250 we would fetch last shard and
/// know that needed entry is in `AccountPlainState`.
/// * If there were no shard we would get `None` entry or entry of different storage key.
///
/// Code example can be found in `reth_provider::HistoricalStateProviderRef`
( AccountHistory ) ShardedKey<Address> | TransitionList
);
table!(
/// Stores pointers to transactions that changed each storage key.
( StorageHistory ) AddressStorageKey | TransitionList
/// Stores pointers to transition changeset with changes for each storage key.
///
/// Last shard key of the storage will contains `u64::MAX` `TransitionId`,
/// this would allows us small optimization on db access when change is in plain state.
///
/// Imagine having shards as:
/// * `Address | StorageKey | 100`
/// * `Address | StorageKey | u64::MAX`
///
/// What we need to find is id that is one greater than N. Db `seek` function allows us to fetch
/// the shard that equal or more than asked. For example:
/// * For N=50 we would get first shard.
/// * for N=150 we would get second shard.
/// * If max transition id is 200 and we ask for N=250 we would fetch last shard and
/// know that needed entry is in `StoragePlainState`.
/// * If there were no shard we would get `None` entry or entry of different storage key.
///
/// Code example can be found in `reth_provider::HistoricalStateProviderRef`
( StorageHistory ) StorageShardedKey | TransitionList
);
dupsort!(
@ -314,6 +307,4 @@ pub type ConfigValue = Vec<u8>;
/// Temporary placeholder type for DB.
pub type BlockNumHashTxNumber = Vec<u8>;
/// Temporary placeholder type for DB.
pub type AddressStorageKey = Vec<u8>;
/// Temporary placeholder type for DB.
pub type Bytecode = Vec<u8>;

View File

@ -4,6 +4,7 @@ pub mod accounts;
pub mod blocks;
pub mod integer_list;
pub mod sharded_key;
pub mod storage_sharded_key;
pub use accounts::*;
pub use blocks::*;

View File

@ -4,26 +4,29 @@ use crate::{
table::{Decode, Encode},
Error,
};
use reth_primitives::TxNumber;
use reth_primitives::TransitionId;
/// Number of indices in one shard.
pub const NUM_OF_INDICES_IN_SHARD: usize = 100;
/// Sometimes data can be too big to be saved for a single key. This helps out by dividing the data
/// into different shards. Example:
///
/// `Address | 200` -> data is from transaction 0 to 200.
/// `Address | 200` -> data is from transition 0 to 200.
///
/// `Address | 300` -> data is from transaction 201 to 300.
#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct ShardedKey<T> {
/// The key for this type.
pub key: T,
/// Highest tx number to which `value` is related to.
pub highest_tx_number: TxNumber,
/// Highest transition id to which `value` is related to.
pub highest_transition_id: TransitionId,
}
impl<T> ShardedKey<T> {
/// Creates a new `ShardedKey<T>`.
pub fn new(key: T, highest_tx_number: TxNumber) -> Self {
ShardedKey { key, highest_tx_number }
pub fn new(key: T, highest_transition_id: TransitionId) -> Self {
ShardedKey { key, highest_transition_id }
}
}
@ -36,7 +39,7 @@ where
fn encode(self) -> Self::Encoded {
let mut buf: Vec<u8> = Encode::encode(self.key).into();
buf.extend_from_slice(&self.highest_tx_number.to_be_bytes());
buf.extend_from_slice(&self.highest_transition_id.to_be_bytes());
buf
}
}

View File

@ -0,0 +1,59 @@
//! Sharded key
use crate::{
table::{Decode, Encode},
Error,
};
use reth_primitives::{TransitionId, H160, H256};
use super::ShardedKey;
/// Number of indices in one shard.
pub const NUM_OF_INDICES_IN_SHARD: usize = 100;
/// Sometimes data can be too big to be saved for a single key. This helps out by dividing the data
/// into different shards. Example:
///
/// `Address | Storagekey | 200` -> data is from transition 0 to 200.
///
/// `Address | StorageKey | 300` -> data is from transition 201 to 300.
#[derive(Debug, Default, Clone, Eq, Ord, PartialOrd, PartialEq)]
pub struct StorageShardedKey {
/// Storage account address.
pub address: H160,
/// Storage slot with highest transition id.
pub sharded_key: ShardedKey<H256>,
}
impl StorageShardedKey {
/// Creates a new `StorageShardedKey`.
pub fn new(address: H160, storage_key: H256, highest_transition_id: TransitionId) -> Self {
Self { address, sharded_key: ShardedKey { key: storage_key, highest_transition_id } }
}
}
impl Encode for StorageShardedKey {
type Encoded = Vec<u8>;
fn encode(self) -> Self::Encoded {
let mut buf: Vec<u8> = Encode::encode(self.address).into();
buf.extend_from_slice(&Encode::encode(self.sharded_key.key));
buf.extend_from_slice(&self.sharded_key.highest_transition_id.to_be_bytes());
buf
}
}
impl Decode for StorageShardedKey {
fn decode<B: Into<bytes::Bytes>>(value: B) -> Result<Self, Error> {
let value: bytes::Bytes = value.into();
let tx_num_index = value.len() - 8;
let highest_tx_number = u64::from_be_bytes(
value.as_ref()[tx_num_index..].try_into().map_err(|_| Error::DecodeError)?,
);
let address = H160::decode(value.slice(..20))?;
let storage_key = H256::decode(value.slice(20..52))?;
Ok(Self { address, sharded_key: ShardedKey::new(storage_key, highest_tx_number) })
}
}

View File

@ -1,6 +1,7 @@
use crate::{AccountProvider, BlockHashProvider, StateProvider};
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRO},
models::storage_sharded_key::StorageShardedKey,
tables,
transaction::DbTx,
};
@ -53,8 +54,9 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for HistoricalStateProviderRef<'a, 'b,
/// Get storage.
fn storage(&self, account: Address, storage_key: StorageKey) -> Result<Option<StorageValue>> {
// TODO when StorageHistory is defined
let transition_id = StorageShardedKey::new(account, storage_key, self.transition);
let transaction_number =
self.tx.get::<tables::StorageHistory>(Vec::new())?.map(|_integer_list|
self.tx.get::<tables::StorageHistory>(transition_id)?.map(|_integer_list|
// TODO select integer that is one less from transaction_number <- // TODO: (rkrasiuk) not sure this comment is still relevant
self.transition);