chore: apply same impl order (#5639)

This commit is contained in:
Matthias Seitz
2023-11-30 16:11:03 +01:00
committed by GitHub
parent c29fcf3b16
commit 02a07f6480

View File

@ -1574,15 +1574,6 @@ impl<TX: DbTx> StageCheckpointReader for DatabaseProvider<TX> {
} }
impl<TX: DbTxMut> StageCheckpointWriter for DatabaseProvider<TX> { impl<TX: DbTxMut> StageCheckpointWriter for DatabaseProvider<TX> {
/// Save stage checkpoint progress.
fn save_stage_checkpoint_progress(
&self,
id: StageId,
checkpoint: Vec<u8>,
) -> ProviderResult<()> {
Ok(self.tx.put::<tables::SyncStageProgress>(id.to_string(), checkpoint)?)
}
/// Save stage checkpoint. /// Save stage checkpoint.
fn save_stage_checkpoint( fn save_stage_checkpoint(
&self, &self,
@ -1592,6 +1583,15 @@ impl<TX: DbTxMut> StageCheckpointWriter for DatabaseProvider<TX> {
Ok(self.tx.put::<tables::SyncStage>(id.to_string(), checkpoint)?) Ok(self.tx.put::<tables::SyncStage>(id.to_string(), checkpoint)?)
} }
/// Save stage checkpoint progress.
fn save_stage_checkpoint_progress(
&self,
id: StageId,
checkpoint: Vec<u8>,
) -> ProviderResult<()> {
Ok(self.tx.put::<tables::SyncStageProgress>(id.to_string(), checkpoint)?)
}
fn update_pipeline_stages( fn update_pipeline_stages(
&self, &self,
block_number: BlockNumber, block_number: BlockNumber,
@ -1678,76 +1678,73 @@ impl<TX: DbTx> StorageReader for DatabaseProvider<TX> {
} }
impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> { impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
fn insert_hashes( fn unwind_account_hashing(
&self, &self,
range: RangeInclusive<BlockNumber>, range: RangeInclusive<BlockNumber>,
end_block_hash: B256, ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
expected_state_root: B256, let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccount>()?;
) -> ProviderResult<()> {
// Initialize prefix sets.
let mut account_prefix_set = PrefixSetMut::default();
let mut storage_prefix_set: HashMap<B256, PrefixSetMut> = HashMap::default();
let mut destroyed_accounts = HashSet::default();
let mut durations_recorder = metrics::DurationsRecorder::default(); // Aggregate all block changesets and make a list of accounts that have been changed.
let hashed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Option<Account>>, (_, account_before)| {
accounts.insert(account_before.address, account_before.info);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|(address, account)| (keccak256(address), account))
.collect::<BTreeMap<_, _>>();
// storage hashing stage hashed_accounts
{ .iter()
let lists = self.changed_storages_with_range(range.clone())?; // Apply values to HashedState (if Account is None remove it);
let storages = self.plain_state_storages(lists)?; .try_for_each(|(hashed_address, account)| -> ProviderResult<()> {
let storage_entries = self.insert_storage_for_hashing(storages)?; if let Some(account) = account {
for (hashed_address, hashed_slots) in storage_entries { hashed_accounts_cursor.upsert(*hashed_address, *account)?;
account_prefix_set.insert(Nibbles::unpack(hashed_address)); } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
for slot in hashed_slots { hashed_accounts_cursor.delete_current()?;
storage_prefix_set
.entry(hashed_address)
.or_default()
.insert(Nibbles::unpack(slot));
} }
Ok(())
})?;
Ok(hashed_accounts)
}
fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccount>()?;
let hashed_accounts = accounts.into_iter().fold(
BTreeMap::new(),
|mut map: BTreeMap<B256, Option<Account>>, (address, account)| {
map.insert(keccak256(address), account);
map
},
);
hashed_accounts.iter().try_for_each(|(hashed_address, account)| -> ProviderResult<()> {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
} }
} Ok(())
durations_recorder.record_relative(metrics::Action::InsertStorageHashing); })?;
// account hashing stage Ok(hashed_accounts)
{
let lists = self.changed_accounts_with_range(range.clone())?;
let accounts = self.basic_accounts(lists)?;
let hashed_addresses = self.insert_account_for_hashing(accounts)?;
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
}
}
durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
// merkle tree
{
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let (state_root, trie_updates) = StateRoot::new(&self.tx)
.with_changed_account_prefixes(account_prefix_set.freeze())
.with_changed_storage_prefixes(
storage_prefix_set.into_iter().map(|(k, v)| (k, v.freeze())).collect(),
)
.with_destroyed_accounts(destroyed_accounts)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
if state_root != expected_state_root {
return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: state_root, expected: expected_state_root },
block_number: *range.end(),
block_hash: end_block_hash,
})))
}
trie_updates.flush(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
Ok(())
} }
fn unwind_storage_hashing( fn unwind_storage_hashing(
@ -1848,73 +1845,76 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
Ok(hashed_storage_keys) Ok(hashed_storage_keys)
} }
fn unwind_account_hashing( fn insert_hashes(
&self, &self,
range: RangeInclusive<BlockNumber>, range: RangeInclusive<BlockNumber>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> { end_block_hash: B256,
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccount>()?; expected_state_root: B256,
) -> ProviderResult<()> {
// Initialize prefix sets.
let mut account_prefix_set = PrefixSetMut::default();
let mut storage_prefix_set: HashMap<B256, PrefixSetMut> = HashMap::default();
let mut destroyed_accounts = HashSet::default();
// Aggregate all block changesets and make a list of accounts that have been changed. let mut durations_recorder = metrics::DurationsRecorder::default();
let hashed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Option<Account>>, (_, account_before)| {
accounts.insert(account_before.address, account_before.info);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|(address, account)| (keccak256(address), account))
.collect::<BTreeMap<_, _>>();
hashed_accounts // storage hashing stage
.iter() {
// Apply values to HashedState (if Account is None remove it); let lists = self.changed_storages_with_range(range.clone())?;
.try_for_each(|(hashed_address, account)| -> ProviderResult<()> { let storages = self.plain_state_storages(lists)?;
if let Some(account) = account { let storage_entries = self.insert_storage_for_hashing(storages)?;
hashed_accounts_cursor.upsert(*hashed_address, *account)?; for (hashed_address, hashed_slots) in storage_entries {
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() { account_prefix_set.insert(Nibbles::unpack(hashed_address));
hashed_accounts_cursor.delete_current()?; for slot in hashed_slots {
storage_prefix_set
.entry(hashed_address)
.or_default()
.insert(Nibbles::unpack(slot));
} }
Ok(())
})?;
Ok(hashed_accounts)
}
fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccount>()?;
let hashed_accounts = accounts.into_iter().fold(
BTreeMap::new(),
|mut map: BTreeMap<B256, Option<Account>>, (address, account)| {
map.insert(keccak256(address), account);
map
},
);
hashed_accounts.iter().try_for_each(|(hashed_address, account)| -> ProviderResult<()> {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
} }
Ok(()) }
})?; durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
Ok(hashed_accounts) // account hashing stage
{
let lists = self.changed_accounts_with_range(range.clone())?;
let accounts = self.basic_accounts(lists)?;
let hashed_addresses = self.insert_account_for_hashing(accounts)?;
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
}
}
durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
// merkle tree
{
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let (state_root, trie_updates) = StateRoot::new(&self.tx)
.with_changed_account_prefixes(account_prefix_set.freeze())
.with_changed_storage_prefixes(
storage_prefix_set.into_iter().map(|(k, v)| (k, v.freeze())).collect(),
)
.with_destroyed_accounts(destroyed_accounts)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
if state_root != expected_state_root {
return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: state_root, expected: expected_state_root },
block_number: *range.end(),
block_hash: end_block_hash,
})))
}
trie_updates.flush(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
Ok(())
} }
} }