refactor: unify code paths for trie unwind (#12741)

This commit is contained in:
Arsenii Kulikov
2024-11-21 19:20:29 +04:00
committed by GitHub
parent 9d3f8cc6a2
commit c73dadacb2
2 changed files with 93 additions and 157 deletions

View File

@ -25,8 +25,7 @@ use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient};
use reth_node_api::NodeTypesWithDBAdapter;
use reth_node_ethereum::EthExecutorProvider;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, ChainSpecProvider, ProviderFactory,
StageCheckpointReader,
providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory, StageCheckpointReader,
};
use reth_prune::PruneModes;
use reth_stages::{
@ -230,11 +229,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
trace!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, ?result, "Pipeline finished");
// Unwind the pipeline without committing.
{
provider_factory
.provider_rw()?
.take_block_and_execution_range(next_block..=target_block)?;
}
provider_factory.provider_rw()?.unwind_trie_state_range(next_block..=target_block)?;
// Update latest block
current_max_block = target_block;

View File

@ -243,6 +243,95 @@ impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
}
}
impl<TX: DbTx + DbTxMut + 'static, N: ProviderNodeTypes> DatabaseProvider<TX, N> {
/// Unwinds trie state for the given range.
///
/// This includes calculating the resulted state root and comparing it with the parent block
/// state root.
pub fn unwind_trie_state_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.collect::<Result<Vec<_>, _>>()?;
// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
}
// Unwind account history indices.
self.unwind_account_history_indices(changed_accounts.iter())?;
let storage_range = BlockNumberAddress::range(range.clone());
let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.collect::<Result<Vec<_>, _>>()?;
// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
for slot in hashed_slots {
storage_prefix_set.insert(Nibbles::unpack(slot));
}
storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
}
// Unwind storage history indices.
self.unwind_storage_history_indices(changed_storages.iter().copied())?;
// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let prefix_sets = TriePrefixSets {
account_prefix_set: account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts,
};
let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root;
// state root should be always correct as we are reverting state.
// but for sake of double verification we will check it again.
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: new_state_root, expected: parent_state_root },
block_number: parent_number,
block_hash: parent_hash,
})))
}
self.write_trie_updates(&trie_updates)?;
Ok(())
}
}
impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
fn try_into_history_at_block(
self,
@ -2913,81 +3002,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: ProviderNodeTypes + 'static> BlockExecutio
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Chain> {
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.collect::<Result<Vec<_>, _>>()?;
// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
}
// Unwind account history indices.
self.unwind_account_history_indices(changed_accounts.iter())?;
let storage_range = BlockNumberAddress::range(range.clone());
let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.collect::<Result<Vec<_>, _>>()?;
// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
for slot in hashed_slots {
storage_prefix_set.insert(Nibbles::unpack(slot));
}
storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
}
// Unwind storage history indices.
self.unwind_storage_history_indices(changed_storages.iter().copied())?;
// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let prefix_sets = TriePrefixSets {
account_prefix_set: account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts,
};
let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root;
// state root should be always correct as we are reverting state.
// but for sake of double verification we will check it again.
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: new_state_root, expected: parent_state_root },
block_number: parent_number,
block_hash: parent_hash,
})))
}
self.write_trie_updates(&trie_updates)?;
self.unwind_trie_state_range(range.clone())?;
// get blocks
let blocks = self.take_block_range(range.clone())?;
@ -3012,81 +3027,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: ProviderNodeTypes + 'static> BlockExecutio
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.collect::<Result<Vec<_>, _>>()?;
// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
}
// Unwind account history indices.
self.unwind_account_history_indices(changed_accounts.iter())?;
let storage_range = BlockNumberAddress::range(range.clone());
let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.collect::<Result<Vec<_>, _>>()?;
// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
for slot in hashed_slots {
storage_prefix_set.insert(Nibbles::unpack(slot));
}
storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
}
// Unwind storage history indices.
self.unwind_storage_history_indices(changed_storages.iter().copied())?;
// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let prefix_sets = TriePrefixSets {
account_prefix_set: account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts,
};
let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root;
// state root should be always correct as we are reverting state.
// but for sake of double verification we will check it again.
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: new_state_root, expected: parent_state_root },
block_number: parent_number,
block_hash: parent_hash,
})))
}
self.write_trie_updates(&trie_updates)?;
self.unwind_trie_state_range(range.clone())?;
// get blocks
let blocks = self.take_block_range(range.clone())?;