mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: add ETL to Hashing Stages (#7030)
This commit is contained in:
@ -1,30 +1,34 @@
|
||||
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
|
||||
use itertools::Itertools;
|
||||
use rayon::slice::ParallelSliceMut;
|
||||
use reth_config::config::EtlConfig;
|
||||
use reth_db::{
|
||||
cursor::{DbCursorRO, DbCursorRW},
|
||||
database::Database,
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
RawKey, RawTable,
|
||||
RawKey, RawTable, RawValue,
|
||||
};
|
||||
use reth_etl::Collector;
|
||||
use reth_interfaces::provider::ProviderResult;
|
||||
use reth_primitives::{
|
||||
keccak256,
|
||||
stage::{
|
||||
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint,
|
||||
StageId,
|
||||
},
|
||||
stage::{AccountHashingCheckpoint, EntitiesCheckpoint, StageCheckpoint, StageId},
|
||||
Account, B256,
|
||||
};
|
||||
use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter, StatsReader};
|
||||
use std::{
|
||||
cmp::max,
|
||||
fmt::Debug,
|
||||
ops::{Range, RangeInclusive},
|
||||
sync::mpsc,
|
||||
sync::mpsc::{self, Receiver},
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
/// Maximum number of channels that can exist in memory.
|
||||
const MAXIMUM_CHANNELS: usize = 10_000;
|
||||
|
||||
/// Maximum number of accounts to hash per rayon worker job.
|
||||
const WORKER_CHUNK_SIZE: usize = 100;
|
||||
|
||||
/// Account hashing stage hashes plain account.
|
||||
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
|
||||
#[derive(Clone, Debug)]
|
||||
@ -32,20 +36,26 @@ pub struct AccountHashingStage {
|
||||
/// The threshold (in number of blocks) for switching between incremental
|
||||
/// hashing and full storage hashing.
|
||||
pub clean_threshold: u64,
|
||||
/// The maximum number of accounts to process before committing.
|
||||
/// The maximum number of accounts to process before committing during unwind.
|
||||
pub commit_threshold: u64,
|
||||
/// ETL configuration
|
||||
pub etl_config: EtlConfig,
|
||||
}
|
||||
|
||||
impl AccountHashingStage {
|
||||
/// Create new instance of [AccountHashingStage].
|
||||
pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self {
|
||||
Self { clean_threshold, commit_threshold }
|
||||
pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self {
|
||||
Self { clean_threshold, commit_threshold, etl_config }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AccountHashingStage {
|
||||
fn default() -> Self {
|
||||
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
|
||||
Self {
|
||||
clean_threshold: 500_000,
|
||||
commit_threshold: 100_000,
|
||||
etl_config: EtlConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -87,7 +97,7 @@ impl AccountHashingStage {
|
||||
generators,
|
||||
generators::{random_block_range, random_eoa_accounts},
|
||||
};
|
||||
use reth_primitives::{Account, B256, U256};
|
||||
use reth_primitives::U256;
|
||||
use reth_provider::providers::StaticFileWriter;
|
||||
|
||||
let mut rng = generators::rng();
|
||||
@ -155,96 +165,45 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
|
||||
// genesis accounts are not in changeset.
|
||||
if to_block - from_block > self.clean_threshold || from_block == 1 {
|
||||
let tx = provider.tx_ref();
|
||||
let stage_checkpoint = input
|
||||
.checkpoint
|
||||
.and_then(|checkpoint| checkpoint.account_hashing_stage_checkpoint());
|
||||
|
||||
let start_address = match stage_checkpoint {
|
||||
Some(AccountHashingCheckpoint { address: address @ Some(_), block_range: CheckpointBlockRange { from, to }, .. })
|
||||
// Checkpoint is only valid if the range of transitions didn't change.
|
||||
// An already hashed account may have been changed with the new range,
|
||||
// and therefore should be hashed again.
|
||||
if from == from_block && to == to_block =>
|
||||
{
|
||||
debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner account hashing checkpoint");
|
||||
// clear table, load all accounts and hash it
|
||||
tx.clear::<tables::HashedAccounts>()?;
|
||||
|
||||
address
|
||||
}
|
||||
_ => {
|
||||
// clear table, load all accounts and hash it
|
||||
tx.clear::<tables::HashedAccounts>()?;
|
||||
let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
|
||||
let mut collector =
|
||||
Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
|
||||
let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
|
||||
|
||||
None
|
||||
// channels used to return result of account hashing
|
||||
for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
|
||||
// An _unordered_ channel to receive results from a rayon job
|
||||
let (tx, rx) = mpsc::channel();
|
||||
channels.push(rx);
|
||||
|
||||
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
|
||||
// Spawn the hashing task onto the global rayon pool
|
||||
rayon::spawn(move || {
|
||||
for (address, account) in chunk.into_iter() {
|
||||
let address = address.key().unwrap();
|
||||
let _ = tx.send((RawKey::new(keccak256(address)), account));
|
||||
}
|
||||
});
|
||||
|
||||
// Flush to ETL when channels length reaches MAXIMUM_CHANNELS
|
||||
if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 {
|
||||
collect(&mut channels, &mut collector)?;
|
||||
}
|
||||
}
|
||||
.take()
|
||||
.map(RawKey::new);
|
||||
|
||||
let next_address = {
|
||||
let mut accounts_cursor =
|
||||
tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
|
||||
collect(&mut channels, &mut collector)?;
|
||||
|
||||
// channels used to return result of account hashing
|
||||
let mut channels = Vec::new();
|
||||
for chunk in &accounts_cursor
|
||||
.walk(start_address.clone())?
|
||||
.take(self.commit_threshold as usize)
|
||||
.chunks(
|
||||
max(self.commit_threshold as usize, rayon::current_num_threads()) /
|
||||
rayon::current_num_threads(),
|
||||
)
|
||||
{
|
||||
// An _unordered_ channel to receive results from a rayon job
|
||||
let (tx, rx) = mpsc::channel();
|
||||
channels.push(rx);
|
||||
let mut hashed_account_cursor =
|
||||
tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
|
||||
|
||||
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
|
||||
// Spawn the hashing task onto the global rayon pool
|
||||
rayon::spawn(move || {
|
||||
for (address, account) in chunk.into_iter() {
|
||||
let address = address.key().unwrap();
|
||||
let _ = tx.send((RawKey::new(keccak256(address)), account));
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut hashed_batch = Vec::with_capacity(self.commit_threshold as usize);
|
||||
|
||||
// Iterate over channels and append the hashed accounts.
|
||||
for channel in channels {
|
||||
while let Ok(hashed) = channel.recv() {
|
||||
hashed_batch.push(hashed);
|
||||
}
|
||||
}
|
||||
// sort it all in parallel
|
||||
hashed_batch.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
|
||||
let mut hashed_account_cursor =
|
||||
tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
|
||||
|
||||
// iterate and put presorted hashed accounts
|
||||
if start_address.is_none() {
|
||||
hashed_batch
|
||||
.into_iter()
|
||||
.try_for_each(|(k, v)| hashed_account_cursor.append(k, v))?;
|
||||
} else {
|
||||
hashed_batch
|
||||
.into_iter()
|
||||
.try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?;
|
||||
}
|
||||
// next key of iterator
|
||||
accounts_cursor.next()?
|
||||
};
|
||||
|
||||
if let Some((next_address, _)) = &next_address {
|
||||
let checkpoint = input.checkpoint().with_account_hashing_stage_checkpoint(
|
||||
AccountHashingCheckpoint {
|
||||
address: Some(next_address.key().unwrap()),
|
||||
block_range: CheckpointBlockRange { from: from_block, to: to_block },
|
||||
progress: stage_checkpoint_progress(provider)?,
|
||||
},
|
||||
);
|
||||
|
||||
return Ok(ExecOutput { checkpoint, done: false })
|
||||
for item in collector.iter()? {
|
||||
let (key, value) = item?;
|
||||
hashed_account_cursor
|
||||
.append(RawKey::<B256>::from_vec(key), RawValue::<Account>::from_vec(value))?;
|
||||
}
|
||||
} else {
|
||||
// Aggregate all transition changesets and make a list of accounts that have been
|
||||
@ -293,6 +252,21 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
|
||||
}
|
||||
}
|
||||
|
||||
/// Flushes channels hashes to ETL collector.
|
||||
fn collect(
|
||||
channels: &mut Vec<Receiver<(RawKey<B256>, RawValue<Account>)>>,
|
||||
collector: &mut Collector<RawKey<B256>, RawValue<Account>>,
|
||||
) -> Result<(), StageError> {
|
||||
for channel in channels.iter_mut() {
|
||||
while let Ok((key, v)) = channel.recv() {
|
||||
collector.insert(key, v)?;
|
||||
}
|
||||
}
|
||||
debug!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
|
||||
channels.clear();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stage_checkpoint_progress<DB: Database>(
|
||||
provider: &DatabaseProviderRW<DB>,
|
||||
) -> ProviderResult<EntitiesCheckpoint> {
|
||||
@ -356,91 +330,6 @@ mod tests {
|
||||
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_clean_account_hashing_with_commit_threshold() {
|
||||
let (previous_stage, stage_progress) = (20, 10);
|
||||
// Set up the runner
|
||||
let mut runner = AccountHashingTestRunner::default();
|
||||
runner.set_clean_threshold(1);
|
||||
runner.set_commit_threshold(5);
|
||||
|
||||
let mut input = ExecInput {
|
||||
target: Some(previous_stage),
|
||||
checkpoint: Some(StageCheckpoint::new(stage_progress)),
|
||||
};
|
||||
|
||||
runner.seed_execution(input).expect("failed to seed execution");
|
||||
|
||||
// first run, hash first five accounts.
|
||||
let rx = runner.execute(input);
|
||||
let result = rx.await.unwrap();
|
||||
|
||||
let fifth_address = runner
|
||||
.db
|
||||
.query(|tx| {
|
||||
let (address, _) = tx
|
||||
.cursor_read::<tables::PlainAccountState>()?
|
||||
.walk(None)?
|
||||
.nth(5)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
Ok(address)
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
result,
|
||||
Ok(ExecOutput {
|
||||
checkpoint: StageCheckpoint {
|
||||
block_number: 10,
|
||||
stage_checkpoint: Some(StageUnitCheckpoint::Account(
|
||||
AccountHashingCheckpoint {
|
||||
address: Some(address),
|
||||
block_range: CheckpointBlockRange {
|
||||
from: 11,
|
||||
to: 20,
|
||||
},
|
||||
progress: EntitiesCheckpoint { processed: 5, total }
|
||||
}
|
||||
))
|
||||
},
|
||||
done: false
|
||||
}) if address == fifth_address &&
|
||||
total == runner.db.table::<tables::PlainAccountState>().unwrap().len() as u64
|
||||
);
|
||||
assert_eq!(runner.db.table::<tables::HashedAccounts>().unwrap().len(), 5);
|
||||
|
||||
// second run, hash next five accounts.
|
||||
input.checkpoint = Some(result.unwrap().checkpoint);
|
||||
let rx = runner.execute(input);
|
||||
let result = rx.await.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
result,
|
||||
Ok(ExecOutput {
|
||||
checkpoint: StageCheckpoint {
|
||||
block_number: 20,
|
||||
stage_checkpoint: Some(StageUnitCheckpoint::Account(
|
||||
AccountHashingCheckpoint {
|
||||
address: None,
|
||||
block_range: CheckpointBlockRange {
|
||||
from: 0,
|
||||
to: 0,
|
||||
},
|
||||
progress: EntitiesCheckpoint { processed, total }
|
||||
}
|
||||
))
|
||||
},
|
||||
done: true
|
||||
}) if processed == total &&
|
||||
total == runner.db.table::<tables::PlainAccountState>().unwrap().len() as u64
|
||||
);
|
||||
assert_eq!(runner.db.table::<tables::HashedAccounts>().unwrap().len(), 10);
|
||||
|
||||
// Validate the stage execution
|
||||
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
|
||||
}
|
||||
|
||||
mod test_utils {
|
||||
use super::*;
|
||||
use crate::test_utils::TestStageDB;
|
||||
@ -450,6 +339,7 @@ mod tests {
|
||||
pub(crate) db: TestStageDB,
|
||||
commit_threshold: u64,
|
||||
clean_threshold: u64,
|
||||
etl_config: EtlConfig,
|
||||
}
|
||||
|
||||
impl AccountHashingTestRunner {
|
||||
@ -509,7 +399,12 @@ mod tests {
|
||||
|
||||
impl Default for AccountHashingTestRunner {
|
||||
fn default() -> Self {
|
||||
Self { db: TestStageDB::default(), commit_threshold: 1000, clean_threshold: 1000 }
|
||||
Self {
|
||||
db: TestStageDB::default(),
|
||||
commit_threshold: 1000,
|
||||
clean_threshold: 1000,
|
||||
etl_config: EtlConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -524,6 +419,7 @@ mod tests {
|
||||
Self::S {
|
||||
commit_threshold: self.commit_threshold,
|
||||
clean_threshold: self.clean_threshold,
|
||||
etl_config: self.etl_config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,25 +1,35 @@
|
||||
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
|
||||
use num_traits::Zero;
|
||||
use itertools::Itertools;
|
||||
use reth_config::config::EtlConfig;
|
||||
use reth_db::{
|
||||
cursor::DbDupCursorRO,
|
||||
codecs::CompactU256,
|
||||
cursor::{DbCursorRO, DbDupCursorRW},
|
||||
database::Database,
|
||||
models::BlockNumberAddress,
|
||||
table::Decompress,
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_etl::Collector;
|
||||
use reth_interfaces::provider::ProviderResult;
|
||||
use reth_primitives::{
|
||||
keccak256,
|
||||
stage::{
|
||||
CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, StageId,
|
||||
StorageHashingCheckpoint,
|
||||
},
|
||||
StorageEntry,
|
||||
stage::{EntitiesCheckpoint, StageCheckpoint, StageId, StorageHashingCheckpoint},
|
||||
BufMut, StorageEntry, B256,
|
||||
};
|
||||
use reth_provider::{DatabaseProviderRW, HashingWriter, StatsReader, StorageReader};
|
||||
use std::{collections::BTreeMap, fmt::Debug};
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
sync::mpsc::{self, Receiver},
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
/// Maximum number of channels that can exist in memory.
|
||||
const MAXIMUM_CHANNELS: usize = 10_000;
|
||||
|
||||
/// Maximum number of storage entries to hash per rayon worker job.
|
||||
const WORKER_CHUNK_SIZE: usize = 100;
|
||||
|
||||
/// Storage hashing stage hashes plain storage.
|
||||
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
|
||||
#[derive(Debug)]
|
||||
@ -27,20 +37,26 @@ pub struct StorageHashingStage {
|
||||
/// The threshold (in number of blocks) for switching between incremental
|
||||
/// hashing and full storage hashing.
|
||||
pub clean_threshold: u64,
|
||||
/// The maximum number of slots to process before committing.
|
||||
/// The maximum number of slots to process before committing during unwind.
|
||||
pub commit_threshold: u64,
|
||||
/// ETL configuration
|
||||
pub etl_config: EtlConfig,
|
||||
}
|
||||
|
||||
impl StorageHashingStage {
|
||||
/// Create new instance of [StorageHashingStage].
|
||||
pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self {
|
||||
Self { clean_threshold, commit_threshold }
|
||||
pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self {
|
||||
Self { clean_threshold, commit_threshold, etl_config }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for StorageHashingStage {
|
||||
fn default() -> Self {
|
||||
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
|
||||
Self {
|
||||
clean_threshold: 500_000,
|
||||
commit_threshold: 100_000,
|
||||
etl_config: EtlConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,104 +84,48 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
|
||||
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
|
||||
// genesis accounts are not in changeset, along with their storages.
|
||||
if to_block - from_block > self.clean_threshold || from_block == 1 {
|
||||
let stage_checkpoint = input
|
||||
.checkpoint
|
||||
.and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint());
|
||||
// clear table, load all accounts and hash it
|
||||
tx.clear::<tables::HashedStorages>()?;
|
||||
|
||||
let (mut current_key, mut current_subkey) = match stage_checkpoint {
|
||||
Some(StorageHashingCheckpoint {
|
||||
address: address @ Some(_),
|
||||
storage,
|
||||
block_range: CheckpointBlockRange { from, to },
|
||||
..
|
||||
})
|
||||
// Checkpoint is only valid if the range of transitions didn't change.
|
||||
// An already hashed storage may have been changed with the new range,
|
||||
// and therefore should be hashed again.
|
||||
if from == from_block && to == to_block =>
|
||||
{
|
||||
debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner storage hashing checkpoint");
|
||||
let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
|
||||
let mut collector =
|
||||
Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
|
||||
let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
|
||||
|
||||
(address, storage)
|
||||
for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
|
||||
// An _unordered_ channel to receive results from a rayon job
|
||||
let (tx, rx) = mpsc::channel();
|
||||
channels.push(rx);
|
||||
|
||||
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
|
||||
// Spawn the hashing task onto the global rayon pool
|
||||
rayon::spawn(move || {
|
||||
for (address, slot) in chunk.into_iter() {
|
||||
let mut addr_key = Vec::with_capacity(64);
|
||||
addr_key.put_slice(keccak256(address).as_slice());
|
||||
addr_key.put_slice(keccak256(slot.key).as_slice());
|
||||
let _ = tx.send((addr_key, CompactU256::from(slot.value)));
|
||||
}
|
||||
_ => {
|
||||
// clear table, load all accounts and hash it
|
||||
tx.clear::<tables::HashedStorages>()?;
|
||||
});
|
||||
|
||||
(None, None)
|
||||
}
|
||||
};
|
||||
|
||||
let mut keccak_address = None;
|
||||
|
||||
let mut hashed_batch = BTreeMap::new();
|
||||
let mut remaining = self.commit_threshold as usize;
|
||||
{
|
||||
let mut storage = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
while !remaining.is_zero() {
|
||||
hashed_batch.extend(
|
||||
storage
|
||||
.walk_dup(current_key, current_subkey)?
|
||||
.take(remaining)
|
||||
.map(|res| {
|
||||
res.map(|(address, slot)| {
|
||||
// Address caching for the first iteration when current_key
|
||||
// is None
|
||||
let keccak_address =
|
||||
if let Some(keccak_address) = keccak_address {
|
||||
keccak_address
|
||||
} else {
|
||||
keccak256(address)
|
||||
};
|
||||
|
||||
// TODO cache map keccak256(slot.key) ?
|
||||
((keccak_address, keccak256(slot.key)), slot.value)
|
||||
})
|
||||
})
|
||||
.collect::<Result<BTreeMap<_, _>, _>>()?,
|
||||
);
|
||||
|
||||
remaining = self.commit_threshold as usize - hashed_batch.len();
|
||||
|
||||
if let Some((address, slot)) = storage.next_dup()? {
|
||||
// There's still some remaining elements on this key, so we need to save
|
||||
// the cursor position for the next
|
||||
// iteration
|
||||
(current_key, current_subkey) = (Some(address), Some(slot.key));
|
||||
} else {
|
||||
// Go to the next key
|
||||
(current_key, current_subkey) = storage
|
||||
.next_no_dup()?
|
||||
.map(|(key, storage_entry)| (key, storage_entry.key))
|
||||
.unzip();
|
||||
|
||||
// Cache keccak256(address) for the next key if it exists
|
||||
if let Some(address) = current_key {
|
||||
keccak_address = Some(keccak256(address));
|
||||
} else {
|
||||
// We have reached the end of table
|
||||
break
|
||||
}
|
||||
}
|
||||
// Flush to ETL when channels length reaches MAXIMUM_CHANNELS
|
||||
if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 {
|
||||
collect(&mut channels, &mut collector)?;
|
||||
}
|
||||
}
|
||||
|
||||
// iterate and put presorted hashed slots
|
||||
hashed_batch.into_iter().try_for_each(|((addr, key), value)| {
|
||||
tx.put::<tables::HashedStorages>(addr, StorageEntry { key, value })
|
||||
})?;
|
||||
collect(&mut channels, &mut collector)?;
|
||||
|
||||
if current_key.is_some() {
|
||||
let checkpoint = input.checkpoint().with_storage_hashing_stage_checkpoint(
|
||||
StorageHashingCheckpoint {
|
||||
address: current_key,
|
||||
storage: current_subkey,
|
||||
block_range: CheckpointBlockRange { from: from_block, to: to_block },
|
||||
progress: stage_checkpoint_progress(provider)?,
|
||||
let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
|
||||
for item in collector.iter()? {
|
||||
let (addr_key, value) = item?;
|
||||
cursor.append_dup(
|
||||
B256::from_slice(&addr_key[..32]),
|
||||
StorageEntry {
|
||||
key: B256::from_slice(&addr_key[32..]),
|
||||
value: CompactU256::decompress(value)?.into(),
|
||||
},
|
||||
);
|
||||
|
||||
return Ok(ExecOutput { checkpoint, done: false })
|
||||
)?;
|
||||
}
|
||||
} else {
|
||||
// Aggregate all changesets and make list of storages that have been
|
||||
@ -212,6 +172,21 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
|
||||
}
|
||||
}
|
||||
|
||||
/// Flushes channels hashes to ETL collector.
|
||||
fn collect(
|
||||
channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
|
||||
collector: &mut Collector<Vec<u8>, CompactU256>,
|
||||
) -> Result<(), StageError> {
|
||||
for channel in channels.iter_mut() {
|
||||
while let Ok((key, v)) = channel.recv() {
|
||||
collector.insert(key, v)?;
|
||||
}
|
||||
}
|
||||
debug!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
|
||||
channels.clear();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stage_checkpoint_progress<DB: Database>(
|
||||
provider: &DatabaseProviderRW<DB>,
|
||||
) -> ProviderResult<EntitiesCheckpoint> {
|
||||
@ -231,14 +206,14 @@ mod tests {
|
||||
use assert_matches::assert_matches;
|
||||
use rand::Rng;
|
||||
use reth_db::{
|
||||
cursor::{DbCursorRO, DbCursorRW},
|
||||
cursor::{DbCursorRW, DbDupCursorRO},
|
||||
models::StoredBlockBodyIndices,
|
||||
};
|
||||
use reth_interfaces::test_utils::{
|
||||
generators,
|
||||
generators::{random_block_range, random_contract_account_range},
|
||||
};
|
||||
use reth_primitives::{stage::StageUnitCheckpoint, Address, SealedBlock, B256, U256};
|
||||
use reth_primitives::{Address, SealedBlock, U256};
|
||||
use reth_provider::providers::StaticFileWriter;
|
||||
|
||||
stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
|
||||
@ -310,156 +285,21 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_clean_storage_hashing_with_commit_threshold() {
|
||||
let (previous_stage, stage_progress) = (500, 100);
|
||||
// Set up the runner
|
||||
let mut runner = StorageHashingTestRunner::default();
|
||||
runner.set_clean_threshold(1);
|
||||
runner.set_commit_threshold(500);
|
||||
|
||||
let mut input = ExecInput {
|
||||
target: Some(previous_stage),
|
||||
checkpoint: Some(StageCheckpoint::new(stage_progress)),
|
||||
};
|
||||
|
||||
runner.seed_execution(input).expect("failed to seed execution");
|
||||
|
||||
// first run, hash first half of storages.
|
||||
let rx = runner.execute(input);
|
||||
let result = rx.await.unwrap();
|
||||
|
||||
let (progress_address, progress_key) = runner
|
||||
.db
|
||||
.query(|tx| {
|
||||
let (address, entry) = tx
|
||||
.cursor_read::<tables::PlainStorageState>()?
|
||||
.walk(None)?
|
||||
.nth(500)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
Ok((address, entry.key))
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
result,
|
||||
Ok(ExecOutput {
|
||||
checkpoint: StageCheckpoint {
|
||||
block_number: 100,
|
||||
stage_checkpoint: Some(StageUnitCheckpoint::Storage(StorageHashingCheckpoint {
|
||||
address: Some(address),
|
||||
storage: Some(storage),
|
||||
block_range: CheckpointBlockRange {
|
||||
from: 101,
|
||||
to: 500,
|
||||
},
|
||||
progress: EntitiesCheckpoint {
|
||||
processed: 500,
|
||||
total
|
||||
}
|
||||
}))
|
||||
},
|
||||
done: false
|
||||
}) if address == progress_address && storage == progress_key &&
|
||||
total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64
|
||||
);
|
||||
assert_eq!(runner.db.table::<tables::HashedStorages>().unwrap().len(), 500);
|
||||
|
||||
// second run with commit threshold of 2 to check if subkey is set.
|
||||
runner.set_commit_threshold(2);
|
||||
let result = result.unwrap();
|
||||
input.checkpoint = Some(result.checkpoint);
|
||||
let rx = runner.execute(input);
|
||||
let result = rx.await.unwrap();
|
||||
|
||||
let (progress_address, progress_key) = runner
|
||||
.db
|
||||
.query(|tx| {
|
||||
let (address, entry) = tx
|
||||
.cursor_read::<tables::PlainStorageState>()?
|
||||
.walk(None)?
|
||||
.nth(502)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
Ok((address, entry.key))
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
result,
|
||||
Ok(ExecOutput {
|
||||
checkpoint: StageCheckpoint {
|
||||
block_number: 100,
|
||||
stage_checkpoint: Some(StageUnitCheckpoint::Storage(
|
||||
StorageHashingCheckpoint {
|
||||
address: Some(address),
|
||||
storage: Some(storage),
|
||||
block_range: CheckpointBlockRange {
|
||||
from: 101,
|
||||
to: 500,
|
||||
},
|
||||
progress: EntitiesCheckpoint {
|
||||
processed: 502,
|
||||
total
|
||||
}
|
||||
}
|
||||
))
|
||||
},
|
||||
done: false
|
||||
}) if address == progress_address && storage == progress_key &&
|
||||
total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64
|
||||
);
|
||||
assert_eq!(runner.db.table::<tables::HashedStorages>().unwrap().len(), 502);
|
||||
|
||||
// third last run, hash rest of storages.
|
||||
runner.set_commit_threshold(1000);
|
||||
input.checkpoint = Some(result.unwrap().checkpoint);
|
||||
let rx = runner.execute(input);
|
||||
let result = rx.await.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
result,
|
||||
Ok(ExecOutput {
|
||||
checkpoint: StageCheckpoint {
|
||||
block_number: 500,
|
||||
stage_checkpoint: Some(StageUnitCheckpoint::Storage(
|
||||
StorageHashingCheckpoint {
|
||||
address: None,
|
||||
storage: None,
|
||||
block_range: CheckpointBlockRange {
|
||||
from: 0,
|
||||
to: 0,
|
||||
},
|
||||
progress: EntitiesCheckpoint {
|
||||
processed,
|
||||
total
|
||||
}
|
||||
}
|
||||
))
|
||||
},
|
||||
done: true
|
||||
}) if processed == total &&
|
||||
total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64
|
||||
);
|
||||
assert_eq!(
|
||||
runner.db.table::<tables::HashedStorages>().unwrap().len(),
|
||||
runner.db.table::<tables::PlainStorageState>().unwrap().len()
|
||||
);
|
||||
|
||||
// Validate the stage execution
|
||||
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
|
||||
}
|
||||
|
||||
struct StorageHashingTestRunner {
|
||||
db: TestStageDB,
|
||||
commit_threshold: u64,
|
||||
clean_threshold: u64,
|
||||
etl_config: EtlConfig,
|
||||
}
|
||||
|
||||
impl Default for StorageHashingTestRunner {
|
||||
fn default() -> Self {
|
||||
Self { db: TestStageDB::default(), commit_threshold: 1000, clean_threshold: 1000 }
|
||||
Self {
|
||||
db: TestStageDB::default(),
|
||||
commit_threshold: 1000,
|
||||
clean_threshold: 1000,
|
||||
etl_config: EtlConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -474,6 +314,7 @@ mod tests {
|
||||
Self::S {
|
||||
commit_threshold: self.commit_threshold,
|
||||
clean_threshold: self.clean_threshold,
|
||||
etl_config: self.etl_config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user