fix: storage hashing stage gets stuck in a loop on big contracts (#1362)

This commit is contained in:
joshieDo
2023-02-16 03:23:30 +08:00
committed by GitHub
parent 6da8967082
commit 9842f15ee0
2 changed files with 92 additions and 72 deletions

View File

@ -6,9 +6,8 @@ use crate::{
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables};
use reth_provider::Transaction;
use reth_stages::{stages::StorageHashingStage, Stage, StageId, UnwindInput};
use reth_stages::{stages::StorageHashingStage, Stage, UnwindInput};
use std::ops::DerefMut;
use tracing::info;
pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
@ -17,15 +16,14 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
output_db: &PlatformPath<DbPath>,
should_run: bool,
) -> Result<()> {
if should_run {
eyre::bail!("StorageHashing stage does not support dry run.")
}
let (output_db, tip_block_number) = setup::<DB>(from, to, output_db, db_tool)?;
unwind_and_copy::<DB>(db_tool, from, tip_block_number, &output_db).await?;
// Try to re-execute the stage without committing
if should_run {
dry_run(output_db, to, from).await?;
}
Ok(())
}
@ -55,31 +53,3 @@ async fn unwind_and_copy<DB: Database>(
Ok(())
}
/// Try to re-execute the stage without committing
async fn dry_run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage. [dry-run]");
let mut tx = Transaction::new(&output_db)?;
let mut stage = StorageHashingStage { clean_threshold: 1, ..Default::default() };
stage
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?;
tx.drop()?;
info!(target: "reth::cli", "Success.");
Ok(())
}

View File

@ -1,4 +1,5 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use num_traits::Zero;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
@ -24,7 +25,7 @@ pub struct StorageHashingStage {
/// The threshold (in number of state transitions) for switching between incremental
/// hashing and full storage hashing.
pub clean_threshold: u64,
/// The maximum number of blocks to process before committing.
/// The maximum number of slots to process before committing.
pub commit_threshold: u64,
}
@ -62,38 +63,74 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx.clear::<tables::HashedStorage>()?;
tx.commit()?;
let mut first_key = None;
let mut current_key = None;
let mut current_subkey = None;
let mut keccak_address = None;
loop {
let next_key = {
let mut hashed_batch = BTreeMap::new();
let mut remaining = self.commit_threshold as usize;
{
let mut storage = tx.cursor_dup_read::<tables::PlainStorageState>()?;
while !remaining.is_zero() {
hashed_batch.extend(
storage
.walk_dup(current_key, current_subkey)?
.take(remaining)
.map(|res| {
res.map(|(address, slot)| {
// Address caching for the first iteration when current_key
// is None
let keccak_address =
if let Some(keccak_address) = keccak_address {
keccak_address
} else {
keccak256(address)
};
let hashed_batch = storage
.walk(first_key)?
.take(self.commit_threshold as usize)
.map(|res| {
res.map(|(address, slot)| {
// both account address and storage slot key are hashed for merkle
// tree.
((keccak256(address), keccak256(slot.key)), slot.value)
})
})
.collect::<Result<BTreeMap<_, _>, _>>()?;
// TODO cache map keccak256(slot.key) ?
((keccak_address, keccak256(slot.key)), slot.value)
})
})
.collect::<Result<BTreeMap<_, _>, _>>()?,
);
// next key of iterator
let next_key = storage.next()?;
remaining = self.commit_threshold as usize - hashed_batch.len();
if let Some((address, slot)) = storage.next_dup()? {
// There's still some remaining elements on this key, so we need to save
// the cursor position for the next
// iteration
current_key = Some(address);
current_subkey = Some(slot.key);
} else {
// Go to the next key
current_key = storage.next_no_dup()?.map(|(key, _)| key);
current_subkey = None;
// Cache keccak256(address) for the next key if it exists
if let Some(address) = current_key {
keccak_address = Some(keccak256(address));
} else {
// We have reached the end of table
break
}
}
}
}
// iterate and put presorted hashed slots
hashed_batch.into_iter().try_for_each(|((addr, key), value)| {
tx.put::<tables::HashedStorage>(addr, StorageEntry { key, value })
})?;
// iterate and put presorted hashed slots
hashed_batch.into_iter().try_for_each(|((addr, key), value)| {
tx.put::<tables::HashedStorage>(addr, StorageEntry { key, value })
})?;
next_key.map(|(key, _)| key)
};
tx.commit()?;
first_key = match next_key {
Some(key) => Some(key),
None => break,
};
// We have reached the end of table
if current_key.is_none() {
break
}
}
} else {
let mut plain_storage = tx.cursor_dup_read::<tables::PlainStorageState>()?;
@ -241,8 +278,14 @@ mod tests {
// Set up the runner
let mut runner = StorageHashingTestRunner::default();
// set low threshold so we hash the whole storage
// set low clean threshold so we hash the whole storage
runner.set_clean_threshold(1);
// set low commit threshold so we force each entry to be a tx.commit and make sure we don't
// hang on one key. Seed execution inserts more than one storage entry per address.
runner.set_commit_threshold(1);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
@ -326,16 +369,19 @@ mod tests {
.get_mut(rand::random::<usize>() % n_accounts as usize)
.unwrap();
let new_entry = StorageEntry {
key: keccak256([rand::random::<u8>()]),
value: U256::from(rand::random::<u8>() % 30 + 1),
};
self.insert_storage_entry(
tx,
(transition_id, *addr).into(),
new_entry,
progress.header.number == stage_progress,
)?;
for _ in 0..2 {
let new_entry = StorageEntry {
key: keccak256([rand::random::<u8>()]),
value: U256::from(rand::random::<u8>() % 30 + 1),
};
self.insert_storage_entry(
tx,
(transition_id, *addr).into(),
new_entry,
progress.header.number == stage_progress,
)?;
}
tx_id += 1;
transition_id += 1;
Ok(())
@ -392,6 +438,10 @@ mod tests {
self.clean_threshold = threshold;
}
fn set_commit_threshold(&mut self, threshold: u64) {
self.commit_threshold = threshold;
}
fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
self.tx
.query(|tx| {